You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:32 UTC
[11/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index bc942ac..52bb0a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -17,9 +17,9 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
-import com.google.protobuf.Internal.EnumLite;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -27,23 +27,32 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import org.apache.drill.exec.exception.DrillbitStartupException;
import java.io.IOException;
import java.net.BindException;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
/**
* A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
* requests will generate more than one outbound request.
*/
-public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
+public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
private ServerBootstrap b;
private volatile boolean connect = false;
+ private final EventLoopGroup eventLoopGroup;
- public BasicServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-
+ public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ super(rpcMapping);
+ this.eventLoopGroup = eventLoopGroup;
+
b = new ServerBootstrap() //
.channel(NioServerSocketChannel.class) //
.option(ChannelOption.SO_BACKLOG, 100) //
@@ -56,17 +65,19 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
- ch.closeFuture().addListener(getCloseHandler(ch));
+ C connection = initRemoteConnection(ch);
+ ch.closeFuture().addListener(getCloseHandler(connection));
ch.pipeline().addLast( //
new ZeroCopyProtobufLengthDecoder(), //
- new RpcDecoder(), //
- new RpcEncoder(), //
- new InboundHandler(ch), //
+ new RpcDecoder(rpcConfig.getName()), //
+ new RpcEncoder(rpcConfig.getName()), //
+ getHandshakeHandler(),
+ new InboundHandler(connection), //
new RpcExceptionHandler() //
);
- channel = ch;
connect = true;
+
}
});
}
@@ -76,12 +87,34 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
return false;
}
+
+ protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
+ protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
+
+ public ServerHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
+ super(handshakeType, parser);
+ }
+
+ @Override
+ protected final void consumeHandshake(Channel c, T inbound) throws Exception {
+ OutboundRpcMessage msg = new OutboundRpcMessage(RpcMode.RESPONSE, this.handshakeType, coordinationId, getHandshakeResponse(inbound));
+ c.write(msg);
+ }
+
+ public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
+
+
+
+
+ }
+
+
public int bind(final int initialPort) throws InterruptedException, DrillbitStartupException{
- int port = initialPort;
+ int port = initialPort-1;
while (true) {
try {
- b.bind(port++).sync();
+ b.bind(++port).sync();
break;
} catch (Exception e) {
if (e instanceof BindException)
@@ -89,13 +122,15 @@ public abstract class BasicServer<T extends EnumLite> extends RpcBus<T>{
throw new DrillbitStartupException("Could not bind Drillbit", e);
}
}
+
connect = !connect;
+ logger.debug("Server started on port {} of type {} ", port, this.getClass().getSimpleName());
return port;
}
@Override
public void close() throws IOException {
- if(b != null) b.shutdown();
+ eventLoopGroup.shutdownGracefully();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index c796e2d..70142bb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -29,21 +29,21 @@ public class CoordinationQueue {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
- private final Map<Integer, DrillRpcFuture<?>> map;
+ private final Map<Integer, DrillRpcFutureImpl<?>> map;
public CoordinationQueue(int segmentSize, int segmentCount) {
- map = new ConcurrentHashMap<Integer, DrillRpcFuture<?>>(segmentSize, 0.75f, segmentCount);
+ map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
}
void channelClosed(Exception ex) {
- for (DrillRpcFuture<?> f : map.values()) {
+ for (DrillRpcFutureImpl<?> f : map.values()) {
f.setException(ex);
}
}
- public <V> DrillRpcFuture<V> getNewFuture(Class<V> clazz) {
+ public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
int i = circularInt.getNext();
- DrillRpcFuture<V> future = DrillRpcFuture.getNewFuture(i, clazz);
+ DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
// logger.debug("Writing to map coord {}, future {}", i, future);
Object old = map.put(i, future);
if (old != null)
@@ -52,8 +52,8 @@ public class CoordinationQueue {
return future;
}
- private DrillRpcFuture<?> removeFromMap(int coordinationId) {
- DrillRpcFuture<?> rpc = map.remove(coordinationId);
+ private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
+ DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
if (rpc == null) {
logger.error("Rpc is null.");
throw new IllegalStateException(
@@ -62,23 +62,25 @@ public class CoordinationQueue {
return rpc;
}
- public <V> DrillRpcFuture<V> getFuture(int coordinationId, Class<V> clazz) {
+ public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
// logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
- DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+ DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
// logger.debug("Got rpc from map {}", rpc);
Class<?> outcomeClass = rpc.getOutcomeClass();
+
if (outcomeClass != clazz) {
- logger.error("Rpc class is not expected class. Original: {}, requested: {}", outcomeClass.getCanonicalName(), clazz.getCanonicalName());
+
throw new IllegalStateException(
String
.format(
- "You attempted to request a future for a coordination id that has a different value class than was used when you "
- + "initially created the coordination id. Requested class %s, originally expected class %s. This shouldn't happen. ",
- clazz.getCanonicalName(), outcomeClass.getCanonicalName()));
+ "RPC Engine had a submission and response configuration mismatch. The RPC request that you submitted was defined with an expected response type of %s. However, "
+ + "when the response returned, a call to getResponseDefaultInstance() with Rpc number %d provided an expected class of %s. This means either your submission uses the wrong type definition"
+ + "or your getResponseDefaultInstance() method responds the wrong instance type ",
+ clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
}
@SuppressWarnings("unchecked")
- DrillRpcFuture<V> crpc = (DrillRpcFuture<V>) rpc;
+ DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
// logger.debug("Returning casted future");
return crpc;
@@ -86,7 +88,7 @@ public class CoordinationQueue {
public void updateFailedFuture(int coordinationId, RpcFailure failure) {
// logger.debug("Updating failed future.");
- DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+ DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
rpc.setException(new RemoteRpcException(failure));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index 9a4a7f7..bae947a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -17,80 +17,10 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
-import java.util.concurrent.ExecutionException;
+import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class DrillRpcFuture<V> extends AbstractCheckedFuture<V, RpcException> {
+public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
- final int coordinationId;
- private final Class<V> clazz;
-
- public DrillRpcFuture(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
- super(delegate);
- this.coordinationId = coordinationId;
- this.clazz = clazz;
- }
-
- public Class<V> getOutcomeClass(){
- return clazz;
- }
-
- /**
- * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
- * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
- * will result in an UnsupportedOperationException.
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- throw new UnsupportedOperationException(
- "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
- }
-
- @Override
- protected RpcException mapException(Exception ex) {
- if (ex instanceof RpcException) return (RpcException) ex;
-
- if (ex instanceof ExecutionException) {
- Throwable e2 = ex.getCause();
-
- if (e2 instanceof RpcException) {
- return (RpcException) e2;
- }
- }
- return new RpcException(ex);
-
- }
-
- @SuppressWarnings("unchecked")
- void setValue(Object value) {
- assert clazz.isAssignableFrom(value.getClass());
- ((InnerFuture<V>) super.delegate()).setValue((V) value);
- }
-
- boolean setException(Throwable t) {
- return ((InnerFuture<V>) super.delegate()).setException(t);
- }
-
- public static class InnerFuture<T> extends AbstractFuture<T> {
- // we rewrite these so that the parent can see them
-
- void setValue(T value) {
- super.set(value);
- }
-
- protected boolean setException(Throwable t) {
- return super.setException(t);
- }
- }
-
- public static <V> DrillRpcFuture<V> getNewFuture(int coordinationId, Class<V> clazz) {
- InnerFuture<V> f = new InnerFuture<V>();
- return new DrillRpcFuture<V>(f, coordinationId, clazz);
- }
-
-
-}
\ No newline at end of file
+ public void addLightListener(RpcOutcomeListener<T> outcomeListener);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
new file mode 100644
index 0000000..ee14eeb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -0,0 +1,118 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
+
+ final int coordinationId;
+ private final Class<V> clazz;
+
+ public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
+ super(delegate);
+ this.coordinationId = coordinationId;
+ this.clazz = clazz;
+ }
+
+ public Class<V> getOutcomeClass(){
+ return clazz;
+ }
+
+ /**
+ * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
+ * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
+ * will result in an UnsupportedOperationException.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException(
+ "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
+ }
+
+ @Override
+ protected RpcException mapException(Exception ex) {
+ Throwable e = ex;
+ while(e instanceof ExecutionException){
+ e = e.getCause();
+ }
+ if (e instanceof RpcException) return (RpcException) e;
+
+ return new RpcException(ex);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ void setValue(Object value) {
+ assert clazz.isAssignableFrom(value.getClass());
+ ((InnerFuture<V>) super.delegate()).setValue((V) value);
+ }
+
+ boolean setException(Throwable t) {
+ return ((InnerFuture<V>) super.delegate()).setException(t);
+ }
+
+ public static class InnerFuture<T> extends AbstractFuture<T> {
+ // we rewrite these so that the parent can see them
+
+ void setValue(T value) {
+ super.set(value);
+ }
+
+ protected boolean setException(Throwable t) {
+ return super.setException(t);
+ }
+ }
+
+ public class RpcOutcomeListenerWrapper implements Runnable{
+ final RpcOutcomeListener<V> inner;
+
+ public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
+ super();
+ this.inner = inner;
+ }
+
+ @Override
+ public void run() {
+ try{
+ inner.success(DrillRpcFutureImpl.this.checkedGet());
+ }catch(RpcException e){
+ inner.failed(e);
+ }
+ }
+ }
+
+ public void addLightListener(RpcOutcomeListener<V> outcomeListener){
+ this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+ }
+
+
+
+ public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
+ InnerFuture<V> f = new InnerFuture<V>();
+ return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
index ab977db..be1ff6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
@@ -18,15 +18,20 @@
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+import java.io.InputStream;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
public class InboundRpcMessage extends RpcMessage{
public ByteBuf pBody;
+ public ByteBuf dBody;
public InboundRpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
- super(mode, rpcType, coordinationId, dBody);
+ super(mode, rpcType, coordinationId);
this.pBody = pBody;
+ this.dBody = dBody;
}
public int getBodySize(){
@@ -37,7 +42,7 @@ public class InboundRpcMessage extends RpcMessage{
void release(){
pBody.release();
- super.release();
+ if(dBody != null) dBody.release();
}
@Override
@@ -46,5 +51,7 @@ public class InboundRpcMessage extends RpcMessage{
+ coordinationId + ", dBody=" + dBody + "]";
}
-
+ public InputStream getProtobufBodyAsIS(){
+ return new ByteBufInputStream(pBody);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
index 91c3d45..e4858c4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import java.util.Arrays;
+
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -24,28 +26,49 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
-class OutboundRpcMessage extends RpcMessage{
+public class OutboundRpcMessage extends RpcMessage {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundRpcMessage.class);
final MessageLite pBody;
-
- public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf dBody) {
- super(mode, rpcType.getNumber(), coordinationId, dBody);
+ public ByteBuf[] dBodies;
+
+ public OutboundRpcMessage(RpcMode mode, EnumLite rpcType, int coordinationId, MessageLite pBody, ByteBuf... dBodies) {
+ super(mode, rpcType.getNumber(), coordinationId);
this.pBody = pBody;
+ this.dBodies = dBodies;
}
-
- public int getBodySize(){
+
+ public int getBodySize() {
int len = pBody.getSerializedSize();
len += RpcEncoder.getRawVarintSize(len);
- if(dBody != null) len += dBody.capacity();
+ len += getRawBodySize();
return len;
}
+ public int getRawBodySize(){
+ if(dBodies == null) return 0;
+ int len = 0;
+
+ for (int i = 0; i < dBodies.length; i++) {
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reader Index {}, Writer Index {}", dBodies[i].readerIndex(), dBodies[i].writerIndex());
+ len += dBodies[i].readableBytes();
+ }
+ return len;
+ }
+
@Override
public String toString() {
return "OutboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
- + coordinationId + ", dBody=" + dBody + "]";
+ + coordinationId + ", dBodies=" + Arrays.toString(dBodies) + "]";
}
-
+ void release(){
+ if(dBodies != null){
+ for(ByteBuf b : dBodies){
+ b.release();
+ }
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
new file mode 100644
index 0000000..cedba10
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+
+public class RemoteConnection{
+ private final Channel channel;
+
+ public RemoteConnection(Channel channel) {
+ super();
+ this.channel = channel;
+ }
+
+
+ public final Channel getChannel() {
+ return channel;
+ }
+
+
+ public ConnectionThrottle getConnectionThrottle(){
+ // can't be implemented until we switch to per query sockets.
+ return null;
+ }
+
+ public interface ConnectionThrottle{
+ public void disableReceiving();
+ public void enableReceiving();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
index 4bd592b..0c4ab7a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+
import io.netty.buffer.ByteBuf;
import com.google.protobuf.Internal.EnumLite;
@@ -27,13 +29,13 @@ public class Response {
public EnumLite rpcType;
public MessageLite pBody;
- public ByteBuf dBody;
+ public ByteBuf[] dBodies;
- public Response(EnumLite rpcType, MessageLite pBody, ByteBuf dBody) {
+ public Response(EnumLite rpcType, MessageLite pBody, ByteBuf... dBodies) {
super();
this.rpcType = rpcType;
this.pBody = pBody;
- this.dBody = dBody;
+ this.dBodies = dBodies;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 76300d1..11764db 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,113 +23,140 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
-import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
+import java.util.Arrays;
import java.util.concurrent.CancellationException;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+import org.slf4j.Logger;
import com.google.common.base.Preconditions;
import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
/**
- * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
+ * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a
+ * system.
+ *
* @param <T>
*/
-public abstract class RpcBus<T extends EnumLite> implements Closeable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcBus.class);
-
- private CoordinationQueue queue = new CoordinationQueue(16, 16);
- protected Channel channel;
+public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable {
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+ protected final CoordinationQueue queue = new CoordinationQueue(16, 16);
protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
- protected abstract Response handle(SocketChannel channel, int RpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
- public abstract boolean isClient();
-
- protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
- SEND protobufBody, Class<RECEIVE> clazz, ByteBuf dataBody) throws RpcException {
+ protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
+
+ public abstract boolean isClient();
+
+ protected final RpcConfig rpcConfig;
+
+ public RpcBus(RpcConfig rpcConfig) {
+ this.rpcConfig = rpcConfig;
+ }
+
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+
+ assert !Arrays.asList(dataBodies).contains(null);
+ assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
+
ByteBuf pBuffer = null;
boolean completed = false;
try {
-// logger.debug("Seding message");
+ // logger.debug("Seding message");
Preconditions.checkNotNull(protobufBody);
- DrillRpcFuture<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
- OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBody);
- ChannelFuture channelFuture = channel.write(m);
+ DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
+ OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+ ChannelFuture channelFuture = connection.getChannel().write(m);
channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
completed = true;
return rpcFuture;
} finally {
if (!completed) {
if (pBuffer != null) pBuffer.release();
- if (dataBody != null) dataBody.release();
+ if (dataBodies != null) {
+ for (ByteBuf b : dataBodies) {
+ b.release();
+ }
+
+ }
}
;
}
}
-
- public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture>{
+ public abstract C initRemoteConnection(Channel channel);
+
+ public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel().remoteAddress());
- queue.channelClosed(new ChannelClosedException());
+ logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
+ .remoteAddress());
+ closeQueueDueToChannelClose();
}
}
-
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch){
+
+ protected void closeQueueDueToChannelClose() {
+ if (this.isClient()) queue.channelClosed(new ChannelClosedException());
+ }
+
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
return new ChannelClosedHandler();
}
-
+
protected class InboundHandler extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
- private final SocketChannel channel;
-
-
- public InboundHandler(SocketChannel channel) {
+ private final C connection;
+ public InboundHandler(C connection) {
super();
- this.channel = channel;
+ this.connection = connection;
}
-
@Override
public void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage msg) throws Exception {
- if(!ctx.channel().isOpen()) return;
+ if (!ctx.channel().isOpen()) return;
- if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
- switch(msg.mode){
+ if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
+ switch (msg.mode) {
case REQUEST:
// handle message and ack.
- Response r = handle(channel, msg.rpcType, msg.pBody, msg.dBody);
- OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId, r.pBody, r.dBody);
- if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
+ Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
+ assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
+ OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId,
+ r.pBody, r.dBodies);
+ if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
ctx.write(outMessage);
break;
-
+
case RESPONSE:
MessageLite m = getResponseDefaultInstance(msg.rpcType);
- DrillRpcFuture<?> rpcFuture = queue.getFuture(msg.coordinationId, m.getClass());
+ assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+ DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
Parser<?> parser = m.getParserForType();
Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
rpcFuture.setValue(value);
- if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+ if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+
break;
-
+
case RESPONSE_FAILURE:
RpcFailure failure = RpcFailure.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
queue.updateFailedFuture(msg.coordinationId, failure);
- if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+ if (RpcConstants.EXTRA_DEBUGGING)
+ logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
break;
-
+
default:
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
}
@@ -147,18 +174,18 @@ public abstract class RpcBus<T extends EnumLite> implements Closeable{
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
-// logger.debug("Completed channel write.");
-
+ // logger.debug("Completed channel write.");
+
if (channelFuture.isCancelled()) {
- DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+ DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
rpcFuture.setException(new CancellationException("Socket operation was canceled."));
} else if (!channelFuture.isSuccess()) {
try {
channelFuture.get();
- throw new IllegalStateException(
- "Future was described as completed and not succesful but did not throw an exception.");
+ throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
} catch (Exception e) {
- DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+ logger.error("Error occurred during Rpc", e);
+ DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
rpcFuture.setException(e);
}
} else {
@@ -168,6 +195,13 @@ public abstract class RpcBus<T extends EnumLite> implements Closeable{
}
}
-
-
+
+ public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
+ try {
+ ByteBufInputStream is = new ByteBufInputStream(pBody);
+ return parser.parseFrom(is);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RpcException(String.format("Failure while decoding message with parser of type. %s", parser.getClass().getCanonicalName()), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
new file mode 100644
index 0000000..c6b4c49
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+
+public class RpcConfig {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
+
+ private final String name;
+ private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
+ private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
+
+ private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap){
+ this.name = name;
+ this.sendMap = ImmutableMap.copyOf(sendMap);
+ this.receiveMap = ImmutableMap.copyOf(receiveMap);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean checkReceive(int rpcType, Class<?> receiveClass){
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
+ RpcMessageType<?,?,?> type = receiveMap.get(rpcType);
+ if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc receive type number of %s.", name, rpcType));
+
+ if(receiveClass != type.getRet()){
+ throw new IllegalStateException(String.format("%s: The definition for receive doesn't match implementation code. The definition is %s however the current receive for this type was of type %s.", name, type, receiveClass.getCanonicalName()));
+ }
+ return true;
+ }
+
+ public boolean checkSend(EnumLite send, Class<?> sendClass, Class<?> receiveClass){
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking send classes for send RpcType %s. Send Class is %s and Receive class is %s.", send, sendClass, receiveClass));
+ RpcMessageType<?,?,?> type = sendMap.get(send);
+ if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc send type of %s.", name, send));
+
+ if(type.getSend() != sendClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to send an object of type %s.", name, type, sendClass.getCanonicalName()));
+ if(type.getRet() != receiveClass) throw new IllegalStateException(String.format("%s: The definition for send doesn't match implementation code. The definition is %s however the current send is trying to setup an expected reception of an object of type %s.", name, type, receiveClass.getCanonicalName()));
+
+ return true;
+ }
+
+ public boolean checkResponseSend(EnumLite responseType, Class<?> responseClass){
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("Checking responce send of type %s with response class of %s.", responseType, responseClass));
+ RpcMessageType<?,?,?> type = receiveMap.get(responseType.getNumber());
+ if(type == null) throw new IllegalStateException(String.format("%s: There is no defined RpcMessage type for a Rpc response of type %s.", name, responseType));
+ if(type.getRet() != responseClass) throw new IllegalStateException(String.format("%s: The definition for the response doesn't match implementation code. The definition is %s however the current response is trying to response with an object of type %s.", name, type, responseClass.getCanonicalName()));
+
+ return true;
+ }
+
+ public static class RpcMessageType<SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>{
+ private T sendEnum;
+ private Class<SEND> send;
+ private T receiveEnum;
+ private Class<RECEIVE> ret;
+ public RpcMessageType(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> ret) {
+ super();
+ this.sendEnum = sendEnum;
+ this.send = send;
+ this.receiveEnum = receiveEnum;
+ this.ret = ret;
+ }
+ public Class<SEND> getSend() {
+ return send;
+ }
+ public void setSend(Class<SEND> send) {
+ this.send = send;
+ }
+ public T getSendEnum() {
+ return sendEnum;
+ }
+ public void setSendEnum(T sendEnum) {
+ this.sendEnum = sendEnum;
+ }
+ public Class<RECEIVE> getRet() {
+ return ret;
+ }
+ public void setRet(Class<RECEIVE> ret) {
+ this.ret = ret;
+ }
+ public T getReceiveEnum() {
+ return receiveEnum;
+ }
+ public void setReceiveEnum(T receiveEnum) {
+ this.receiveEnum = receiveEnum;
+ }
+ @Override
+ public String toString() {
+ return "RpcMessageType [sendEnum=" + sendEnum + ", send=" + send + ", receiveEnum=" + receiveEnum + ", ret="
+ + ret + "]";
+ }
+
+
+ }
+
+ public static RpcConfigBuilder newBuilder(String name){
+ return new RpcConfigBuilder(name);
+ }
+
+ public static class RpcConfigBuilder {
+ private final String name;
+ private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
+ private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
+
+ private RpcConfigBuilder(String name){
+ this.name = name;
+ }
+
+ public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec){
+ RpcMessageType<SEND, RECEIVE, T> type = new RpcMessageType<SEND, RECEIVE, T>(sendEnum, send, receiveEnum, rec);
+ this.sendMap.put(sendEnum, type);
+ this.receiveMap.put(receiveEnum.getNumber(), type);
+ return this;
+ }
+
+ public RpcConfig build(){
+ return new RpcConfig(name, sendMap, receiveMap);
+
+ }
+ }
+
+
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
index 134e54b..4e9714b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.MessageToMessageDecoder;
@@ -32,14 +33,19 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
* Converts a previously length adjusted buffer into an RpcMessage.
*/
class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class);
+ final org.slf4j.Logger logger;
private final AtomicLong messageCounter = new AtomicLong();
+ public RpcDecoder(String name){
+ this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "." + name);
+ }
+
+
@Override
- protected InboundRpcMessage decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
if(!ctx.channel().isOpen()){
- return null;
+ return;
}
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Inbound rpc message received.");
@@ -94,7 +100,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
buffer.skipBytes(dBodyLength);
messageCounter.incrementAndGet();
if (RpcConstants.EXTRA_DEBUGGING) logger.trace("Inbound Rpc Message Decoded {}.", m);
- return m;
+ out.add(m);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index 8d3d97c..f76d648 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -34,7 +34,7 @@ import com.google.protobuf.WireFormat;
* Converts an RPCMessage into wire format.
*/
class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class);
+ final org.slf4j.Logger logger;
static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
@@ -43,6 +43,9 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
+ public RpcEncoder(String name){
+ this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "." + name);
+ }
@Override
public void flush(ChannelHandlerContext ctx, OutboundRpcMessage msg) throws Exception {
@@ -61,7 +64,7 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
// figure out the full length
int headerLength = header.getSerializedSize();
int protoBodyLength = msg.pBody.getSerializedSize();
- int rawBodyLength = msg.dBody == null ? 0 : msg.dBody.readableBytes();
+ int rawBodyLength = msg.getRawBodySize();
int fullLength = //
HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength + //
PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; //
@@ -89,11 +92,15 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
msg.pBody.writeTo(cos);
// if exists, write data body and tag.
- if(msg.dBody != null && msg.dBody.readableBytes() > 0){
+ // TODO: is it possible to avoid this copy, i think so...
+ if(msg.getRawBodySize() > 0){
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Writing raw body of size {}", msg.getRawBodySize());
cos.writeRawVarint32(RAW_BODY_TAG);
cos.writeRawVarint32(rawBodyLength);
cos.flush(); // need to flush so that dbody goes after if cos is caching.
- buf.writeBytes(msg.dBody);
+ for(int i =0; i < msg.dBodies.length; i++){
+ buf.writeBytes(msg.dBodies[i]);
+ }
}else{
cos.flush();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index ef1b88f..a0aed94 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -25,28 +25,23 @@ public class RpcExceptionHandler implements ChannelHandler{
public RpcExceptionHandler(){
}
-
- @Override
- public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
- }
- @Override
- public void afterAdd(ChannelHandlerContext ctx) throws Exception {
- }
@Override
- public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if(!ctx.channel().isOpen()) return;
+ logger.info("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+ ctx.close();
}
+
@Override
- public void afterRemove(ChannelHandlerContext ctx) throws Exception {
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
+
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if(!ctx.channel().isOpen()) return;
- logger.info("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
- ctx.close();
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
index fd1938d..08ea150 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
@@ -27,19 +27,14 @@ public abstract class RpcMessage {
public RpcMode mode;
public int rpcType;
public int coordinationId;
- public ByteBuf dBody;
- public RpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf dBody) {
+ public RpcMessage(RpcMode mode, int rpcType, int coordinationId) {
this.mode = mode;
this.rpcType = rpcType;
this.coordinationId = coordinationId;
- this.dBody = dBody;
}
public abstract int getBodySize();
-
- void release(){
- if(dBody != null) dBody.release();
- }
+ abstract void release();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
new file mode 100644
index 0000000..fac908c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public abstract class RpcOutcomeListener<V> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+
+ public void failed(RpcException ex){};
+ public void success(V value){};
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 462bc52..20a7d7d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
@@ -30,12 +31,14 @@ import com.google.protobuf.CodedInputStream;
public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZeroCopyProtobufLengthDecoder.class);
+
@Override
- protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
+
if(!ctx.channel().isOpen()){
logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
in.skipBytes(in.readableBytes());
- return null;
+ return;
}
in.markReaderIndex();
@@ -43,7 +46,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
for (int i = 0; i < buf.length; i ++) {
if (!in.isReadable()) {
in.resetReaderIndex();
- return null;
+ return;
}
buf[i] = in.readByte();
@@ -60,13 +63,14 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
if (in.readableBytes() < length) {
in.resetReaderIndex();
- return null;
+ return;
} else {
- ByteBuf out = in.slice(in.readerIndex(), length);
+ ByteBuf outBuf = in.slice(in.readerIndex(), length);
in.retain();
in.skipBytes(length);
if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i+1, length));
- return out;
+ out.add(outBuf);
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
new file mode 100644
index 0000000..ecaf8d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/AvailabilityListener.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+public interface AvailabilityListener {
+ public void isAvailable(BitConnection connection);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index b16c6cb..4ba99a1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -18,45 +18,88 @@
package org.apache.drill.exec.rpc.bit;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
import com.google.protobuf.MessageLite;
-public class BitClient extends BasicClient<RpcType>{
-
- private final DrillbitContext context;
+public class BitClient extends BasicClient<RpcType, BitConnection>{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
+
private final BitComHandler handler;
+ private final DrillbitEndpoint endpoint;
+ private BitConnection connection;
+ private final AvailabilityListener openListener;
+ private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+ private final ListenerPool listeners;
- public BitClient(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
- super(alloc, eventLoopGroup);
- this.context = context;
+ public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+ super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+
+ this.endpoint = endpoint;
this.handler = handler;
+ this.openListener = openListener;
+ this.registry = registry;
+ this.listeners = listeners;
}
+ public BitHandshake connect() throws RpcException, InterruptedException{
+ BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
+ connection.setEndpoint(endpoint);
+ return bs;
+ }
+
+ @SuppressWarnings("unchecked")
@Override
- protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
- return handler.getResponseDefaultInstance(rpcType);
+ public BitConnection initRemoteConnection(Channel channel) {
+ this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+ return connection;
}
@Override
- protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handler.handle(context, rpcType, pBody, dBody);
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
+ return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
- return super.getCloseHandler(ch);
+ public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ return BitComDefaultInstanceHandler.getResponseDefaultInstance(rpcType);
}
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
+ @Override
+ protected Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ return handler.handle(connection, rpcType, pBody, dBody);
+ }
+
+ @Override
+ protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
+ return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
+
+ @Override
+ protected void validateHandshake(BitHandshake inbound) throws Exception {
+ logger.debug("Handling handshake from bit server to bit client. {}", inbound);
+ if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ }
+
+ };
+ }
+
+ public BitConnection getConnection(){
+ return this.connection;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index 2349899..c60d36b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -17,73 +17,34 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.GenericFutureListener;
-
import java.io.Closeable;
-import java.util.Collection;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
/**
- * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a server
- * or a client depending on who initially made the connection. If no connection exists, BitCom is
- * responsible for making a connection. BitCom should automatically straight route local BitCommunication rather than connecting to its self.
+ * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a
+ * server or a client depending on who initially made the connection. If no connection exists, BitCom is responsible for
+ * making a connection. BitCom should automatically straight route local BitCommunication rather than connecting to its
+ * self.
*/
-public interface BitCom extends Closeable{
+public interface BitCom extends Closeable {
/**
- * Routes the output of a RecordBatch to another node. The record batch
- * @param node The node id to send the record batch to.
- * @param batch The record batch to send.
- * @return A SendProgress object which can be used to monitor the sending of the batch.
- */
- public abstract DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch);
-
-
- /**
- * Requests an iterator to access an incoming record batch.
- * @param fragmentId
- * @return
- */
- public RecordBatch getReceivingRecordBatchHandle(int majorFragmentId, int minorFragmentId);
-
- /**
- * Send a query PlanFragment to another bit.
- * @param context
+ * Get a Bit to Bit communication tunnel. If the BitCom doesn't have a tunnel attached to the node already, it will
+ * start creating one. This create the connection asynchronously.
+ *
* @param node
- * @param fragment
* @return
*/
- public abstract DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node, PlanFragment fragment);
+ public BitTunnel getTunnel(DrillbitEndpoint node) ;
- public abstract void startQuery(Collection<DrillbitEndpoint> firstNodes, long queryId);
-
-
- public abstract DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
-
- public abstract DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
-
-
- public interface TunnelListener extends GenericFutureListener<ChannelFuture> {
- public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus);
- }
-
- public interface SendManager{
- /**
- * Sender responsible for regularly checking this value to see whether it should continue to send or yield the process
- * @return
- */
- public boolean canContinue();
- }
+ public int start() throws InterruptedException, DrillbitStartupException;
+ /**
+ * Register an incoming batch handler for a local foreman.
+ * @param handler
+ */
+ public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
new file mode 100644
index 0000000..e1d4902
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComDefaultInstanceHandler.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.protobuf.MessageLite;
+
+public class BitComDefaultInstanceHandler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComDefaultInstanceHandler.class);
+
+
+ public static MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ switch (rpcType) {
+ case RpcType.ACK_VALUE:
+ return Ack.getDefaultInstance();
+ case RpcType.HANDSHAKE_VALUE:
+ return BitHandshake.getDefaultInstance();
+ case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
+ return FragmentHandle.getDefaultInstance();
+ case RpcType.RESP_FRAGMENT_STATUS_VALUE:
+ return FragmentStatus.getDefaultInstance();
+ case RpcType.RESP_BIT_STATUS_VALUE:
+ return BitStatus.getDefaultInstance();
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
deleted file mode 100644
index b2c5cbb..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.bit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.socket.SocketChannel;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitBatchChunk;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.bit.BitCom.TunnelListener;
-import org.apache.drill.exec.rpc.bit.BitComImpl.TunnelModifier;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import com.google.protobuf.MessageLite;
-
-public class BitComHandler {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComHandler.class);
-
- private final TunnelModifier modifier;
-
- public BitComHandler(TunnelModifier modifier){
- this.modifier = modifier;
- }
-
- public TunnelListener getTunnelListener(RpcBus<?>.ChannelClosedHandler internalHandler){
- return new Listener(internalHandler);
- }
-
- public class Listener implements TunnelListener {
- final RpcBus<?>.ChannelClosedHandler internalHandler;
-
- public Listener(RpcBus<?>.ChannelClosedHandler internalHandler) {
- this.internalHandler = internalHandler;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- logger.debug("BitTunnel closed, removing from BitCom.");
- internalHandler.operationComplete(future);
- BitTunnel t = modifier.remove(future.channel());
- if(t != null) t.shutdownIfClient();
- }
-
- @Override
- public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
- modifier.create(channel, endpoint, bus);
- }
-
- }
-
-
-
-
- public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
- switch (rpcType) {
- case RpcType.ACK_VALUE:
- return Ack.getDefaultInstance();
- case RpcType.HANDSHAKE_VALUE:
- return BitHandshake.getDefaultInstance();
- case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
- return FragmentHandle.getDefaultInstance();
- case RpcType.RESP_FRAGMENT_STATUS_VALUE:
- return FragmentStatus.getDefaultInstance();
- case RpcType.RESP_BIT_STATUS_VALUE:
- return BitStatus.getDefaultInstance();
- case RpcType.RESP_BATCH_CHUNK_VALUE:
- return BitBatchChunk.getDefaultInstance();
-
- default:
- throw new UnsupportedOperationException();
- }
- }
-
- protected Response handle(DrillbitContext context, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- switch (rpcType) {
-
- case RpcType.HANDSHAKE_VALUE:
- // parse incoming handshake.
- // get endpoint information.
- // record endpoint information in registry.
- // respond with our handshake info.
- return new Response(RpcType.HANDSHAKE, BitHandshake.getDefaultInstance(), null);
-
- case RpcType.REQ_BATCH_CHUNK_VALUE:
- return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
-
- case RpcType.REQ_BIT_STATUS_VALUE:
- return new Response(RpcType.RESP_BIT_STATUS, BitStatus.getDefaultInstance(), null);
-
- case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
- return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
-
- case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- return new Response(RpcType.RESP_FRAGMENT_STATUS, FragmentStatus.getDefaultInstance(), null);
-
- case RpcType.REQ_INIATILIZE_FRAGMENT_VALUE:
- return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
-
- case RpcType.REQ_RECORD_BATCH_VALUE:
- return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
-
- default:
- throw new UnsupportedOperationException();
- }
-
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index aada154..c98be44 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -17,138 +17,158 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
-import io.netty.channel.Channel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * Manages communication tunnels between nodes.
+ */
public class BitComImpl implements BitCom {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
- private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newConcurrentMap();
- private Map<SocketChannel, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
- private Object lock = new Object();
- private BitServer server;
- private DrillbitContext context;
+ private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
+ private final ListenerPool listeners;
+ private volatile BitServer server;
+ private final BitComHandler handler;
+ private final BootStrapContext context;
+
+ // TODO: this executor should be removed.
+ private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
- public BitComImpl(DrillbitContext context) {
+ public BitComImpl(BootStrapContext context, BitComHandler handler) {
+ super();
+ this.handler = handler;
this.context = context;
+ this.listeners = new ListenerPool(8);
}
public int start() throws InterruptedException, DrillbitStartupException {
- server = new BitServer(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), context);
+ server = new BitServer(handler, context, registry, listeners);
int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
return server.bind(port);
}
- private Future<BitTunnel> getNode(DrillbitEndpoint endpoint) {
- return null;
+ private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
+
+
+ SettableFuture<BitConnection> future = SettableFuture.create();
+ BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
+ BitConnection t = null;
+
+ if (check) {
+ t = registry.get(endpoint);
+
+ if (t != null) {
+ future.set(t);
+ return checkedFuture;
+ }
+ }
-// BitTunnel t = tunnels.get(endpoint);
-// if (t == null) {
-// synchronized (lock) {
-// t = tunnels.get(endpoint);
-// if (t != null) return t;
-// BitClient c = new BitClient(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(),
-// context.getBitLoopGroup(), context);
-//
-// // need to figure what to do here with regards to waiting for handshake before returning. Probably need to add
-// // future registry so that new endpoint registration ping the registry.
-// throw new UnsupportedOperationException();
-// c.connectAsClient(endpoint.getAddress(), endpoint.getBitPort()).await();
-// t = new BitTunnel(c);
-// tunnels.put(endpoint, t);
-//
-// }
-// }
-// return null;
+ try {
+ AvailWatcher watcher = new AvailWatcher(future);
+ BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
+ c.connect();
+ return checkedFuture;
+ } catch (InterruptedException | RpcException e) {
+ future.setException(new FragmentSetupException("Unable to open connection"));
+ return checkedFuture;
+ }
+
}
-
+ private class AvailWatcher implements AvailabilityListener{
+ final SettableFuture<BitConnection> future;
+
+ public AvailWatcher(SettableFuture<BitConnection> future) {
+ super();
+ this.future = future;
+ }
- @Override
- public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node,
- PlanFragment fragment) {
- return null;
+ @Override
+ public void isAvailable(BitConnection connection) {
+ future.set(connection);
+ }
+
+ }
+
+ BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
+ BitConnection t = registry.get(endpoint);
+ if(t != null) return t;
+ return this.getNode(endpoint, false).checkedGet();
}
- @Override
- public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle) {
- return null;
+
+ CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
+ return this.getNode(endpoint, true);
}
+
@Override
- public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node,
- FragmentHandle handle) {
- return null;
+ public BitTunnel getTunnel(DrillbitEndpoint endpoint){
+ BitConnection t = registry.get(endpoint);
+ if(t == null){
+ return new BitTunnel(exec, endpoint, this, t);
+ }else{
+ return new BitTunnel(exec, endpoint, this, this.getNode(endpoint, false));
+ }
}
- private final TunnelModifier modifier = new TunnelModifier();
/**
- * Fully synchronized modifier. Contention should be low since endpoints shouldn't be constantly changing.
+ * A future which remaps exceptions to a BitComException.
+ * @param <T>
*/
- class TunnelModifier {
- public BitTunnel remove(Channel ch) {
- synchronized (this) {
- DrillbitEndpoint endpoint = endpoints.remove(ch);
- if (endpoint == null) {
- logger
- .warn("We attempted to find a endpoint from a provided channel and found none. This suggests a race condition or memory leak problem.");
- return null;
- }
-
- BitTunnel tunnel = tunnels.remove(endpoint);
- return tunnel;
- }
+ private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
+
+ protected BitComFuture(ListenableFuture<T> delegate) {
+ super(delegate);
}
- public void create(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
- synchronized (this) {
- endpoints.put(channel, endpoint);
- tunnels.put(endpoint, new BitTunnel(bus));
+ @Override
+ protected RpcException mapException(Exception e) {
+ Throwable t = e;
+ if(e instanceof ExecutionException){
+ t = e.getCause();
}
+
+ if(t instanceof RpcException) return (RpcException) t;
+ return new RpcException(t);
}
}
public void close() {
Closeables.closeQuietly(server);
- for (BitTunnel bt : tunnels.values()) {
+ for (BitConnection bt : registry.values()) {
bt.shutdownIfClient();
}
}
-
@Override
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch) {
- return null;
- }
-
- @Override
- public RecordBatch getReceivingRecordBatchHandle(int majorFragmentId, int minorFragmentId) {
- return null;
- }
-
- @Override
- public void startQuery(Collection<DrillbitEndpoint> firstNodes, long queryId) {
+ public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+ this.handler.registerIncomingFragmentHandler(handler);
}
+
+
}