You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/09 17:51:48 UTC
[2/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778,
DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.
DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/960f876a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/960f876a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/960f876a
Branch: refs/heads/master
Commit: 960f876a1945eee4eeb0d6a98c5c58dfe2eea1a9
Parents: d4f9bf2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri May 8 08:15:36 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 07:12:23 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../apache/drill/exec/client/DrillClient.java | 8 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 110 ++++++++++----
.../exec/rpc/BasicClientWithConnection.java | 21 +--
.../org/apache/drill/exec/rpc/BasicServer.java | 78 +++++++---
.../drill/exec/rpc/CoordinationQueue.java | 2 +-
.../apache/drill/exec/rpc/RemoteConnection.java | 31 +++-
.../java/org/apache/drill/exec/rpc/RpcBus.java | 151 +++++++++++--------
.../org/apache/drill/exec/rpc/RpcConfig.java | 35 ++++-
.../drill/exec/rpc/RpcExceptionHandler.java | 12 +-
.../drill/exec/rpc/control/ControlClient.java | 14 +-
.../exec/rpc/control/ControlConnection.java | 7 +-
.../exec/rpc/control/ControlRpcConfig.java | 25 +--
.../drill/exec/rpc/control/ControlServer.java | 13 +-
.../exec/rpc/data/BitServerConnection.java | 7 +-
.../apache/drill/exec/rpc/data/DataClient.java | 11 +-
.../exec/rpc/data/DataClientConnection.java | 6 +-
.../drill/exec/rpc/data/DataRpcConfig.java | 15 +-
.../apache/drill/exec/rpc/data/DataServer.java | 11 +-
.../drill/exec/rpc/user/QueryResultHandler.java | 36 ++++-
.../apache/drill/exec/rpc/user/UserClient.java | 19 ++-
.../drill/exec/rpc/user/UserRpcConfig.java | 22 ++-
.../apache/drill/exec/rpc/user/UserServer.java | 23 ++-
.../apache/drill/exec/work/foreman/Foreman.java | 19 +++
.../src/main/resources/drill-module.conf | 2 +
.../drill/exec/proto/GeneralRPCProtos.java | 26 +++-
.../drill/exec/proto/SchemaUserProtos.java | 7 +
.../org/apache/drill/exec/proto/UserProtos.java | 123 ++++++++++++---
.../apache/drill/exec/proto/beans/RpcMode.java | 6 +-
.../exec/proto/beans/UserToBitHandshake.java | 23 +++
protocol/src/main/protobuf/GeneralRPC.proto | 2 +
protocol/src/main/protobuf/User.proto | 1 +
32 files changed, 626 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be8c7a0..97d5770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -42,7 +42,9 @@ public interface ExecConstants {
public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
public static final String SERVICE_NAME = "drill.exec.cluster-id";
public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+ public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
+ public static final String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";
public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
public static final String FUNCTION_PACKAGES = "drill.exec.functions";
public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 9924704..136d8c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -20,20 +20,17 @@ package org.apache.drill.exec.client;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
-
-import com.google.common.base.Strings;
import io.netty.buffer.DrillBuf;
+import io.netty.channel.EventLoopGroup;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
-import io.netty.channel.EventLoopGroup;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
@@ -63,6 +60,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -208,7 +206,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
- client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
+ client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup);
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
connect(endpoint);
connected = true;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 72ae130..1661f81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -20,18 +20,24 @@ package org.apache.drill.exec.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import com.google.protobuf.Internal.EnumLite;
@@ -40,7 +46,12 @@ import com.google.protobuf.Parser;
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
extends RpcBus<T, R> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+
+ // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
+ // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
+ // seconds, the idle state handler will send a ping message.
+ private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
private final Bootstrap b;
protected R connection;
@@ -48,18 +59,23 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
private final Class<HANDSHAKE_RESPONSE> responseClass;
private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
+ private final IdlePingHandler pingHandler;
+
public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
super(rpcMapping);
this.responseClass = responseClass;
this.handshakeType = handshakeType;
this.handshakeParser = handshakeParser;
+ final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
+ : -1;
+ this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
b = new Bootstrap() //
.group(eventLoopGroup) //
.channel(TransportCheck.getClientSocketChannel()) //
.option(ChannelOption.ALLOCATOR, alloc) //
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 1 << 17) //
.option(ChannelOption.SO_SNDBUF, 1 << 17) //
@@ -68,40 +84,72 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
protected void initChannel(SocketChannel ch) throws Exception {
-// logger.debug("initializing client connection.");
+ // logger.debug("initializing client connection.");
connection = initRemoteConnection(ch);
- ch.closeFuture().addListener(getCloseHandler(connection));
-
- ch.pipeline().addLast( //
- getDecoder(connection.getAllocator()), //
- new RpcDecoder("c-" + rpcConfig.getName()), //
- new RpcEncoder("c-" + rpcConfig.getName()), //
- new ClientHandshakeHandler(), //
- new InboundHandler(connection), //
- new RpcExceptionHandler() //
- );
+
+ ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+ final ChannelPipeline pipe = ch.pipeline();
+
+ pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
+ pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
+ pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
+ pipe.addLast("handshake-handler", new ClientHandshakeHandler());
+
+ if(pingHandler != null){
+ pipe.addLast("idle-state-handler", pingHandler);
+ }
+
+ pipe.addLast("message-handler", new InboundHandler(connection));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
}
}); //
-// if(TransportCheck.SUPPORTS_EPOLL){
-// b.option(EpollChannelOption.SO_REUSEPORT, true); //
-// }
+ // if(TransportCheck.SUPPORTS_EPOLL){
+ // b.option(EpollChannelOption.SO_REUSEPORT, true); //
+ // }
+ }
+
+ private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+
+ /**
+ * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
+ * timeout, we send a PING message to the server to state that we are still alive.
+ */
+ private class IdlePingHandler extends IdleStateHandler {
+
+ private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
+ public void operationComplete(Future<? super Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Unable to maintain connection {}. Closing connection.", connection.getName());
+ connection.close();
+ }
+ }
+ };
+
+ public IdlePingHandler(long idleWaitInMillis) {
+ super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
+ if (evt.state() == IdleState.WRITER_IDLE) {
+ ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
+ }
+ }
}
public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
- public boolean isActive(){
+ public boolean isActive() {
return connection != null
&& connection.getChannel() != null
- && connection.getChannel().isActive() ;
+ && connection.getChannel().isActive();
}
protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
- protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
- protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
- return new ChannelClosedHandler();
- }
+ protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
@@ -118,7 +166,8 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
return true;
}
- protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+ protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
+ String host, int port) {
ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
b.connect(host, port).addListener(cml.connectionHandler);
}
@@ -145,16 +194,17 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
+ // logger.debug("Connection operation finished. Success: {}", future.isSuccess());
try {
future.get();
if (future.isSuccess()) {
- // send a handshake on the current thread. This is the only time we will send from within the event thread. We can do this because the connection will not be backed up.
+ // send a handshake on the current thread. This is the only time we will send from within the event thread.
+ // We can do this because the connection will not be backed up.
send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
} else {
l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
}
-// logger.debug("Handshake queued for send.");
+ // logger.debug("Handshake queued for send.");
} catch (Exception ex) {
l.connectionFailed(FailureType.CONNECTION, ex);
}
@@ -174,12 +224,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
-// logger.debug("Handshake received. {}", value);
+ // logger.debug("Handshake received. {}", value);
try {
BasicClient.this.validateHandshake(value);
BasicClient.this.finalizeConnection(value, connection);
l.connectionSucceeded(connection);
-// logger.debug("Handshake completed succesfully.");
+ // logger.debug("Handshake completed succesfully.");
} catch (RpcException ex) {
l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
}
@@ -205,7 +255,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
- public void setAutoRead(boolean enableAutoRead){
+ public void setAutoRead(boolean enableAutoRead) {
connection.setAutoRead(enableAutoRead);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 08819ca..ab54fa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -18,10 +18,8 @@
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.channel.socket.SocketChannel;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -35,16 +33,13 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
private BufferAllocator alloc;
+ private final String connectionName;
public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
- Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
this.alloc = alloc;
- }
-
- @Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
- return getCloseHandler(clientConnection.getChannel());
+ this.connectionName = connectionName;
}
@Override
@@ -55,16 +50,16 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
@Override
- public ServerConnection initRemoteConnection(Channel channel) {
- return new ServerConnection(channel, alloc);
+ public ServerConnection initRemoteConnection(SocketChannel channel) {
+ return new ServerConnection(connectionName, channel, alloc);
}
public static class ServerConnection extends RemoteConnection{
private final BufferAllocator alloc;
- public ServerConnection(Channel channel, BufferAllocator alloc) {
- super(channel);
+ public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
+ super(channel, name);
this.alloc = alloc;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 3a7032b..a148436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -20,12 +20,13 @@ package org.apache.drill.exec.rpc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.IOException;
import java.net.BindException;
@@ -44,40 +45,52 @@ import com.google.protobuf.Parser;
* requests will generate more than one outbound request.
*/
public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+ protected static final String TIMEOUT_HANDLER = "timeout-handler";
private ServerBootstrap b;
private volatile boolean connect = false;
private final EventLoopGroup eventLoopGroup;
- public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
super(rpcMapping);
this.eventLoopGroup = eventLoopGroup;
- b = new ServerBootstrap() //
- .channel(TransportCheck.getServerSocketChannel()) //
- .option(ChannelOption.SO_BACKLOG, 1000) //
+
+ b = new ServerBootstrap()
+ .channel(TransportCheck.getServerSocketChannel())
+ .option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.SO_RCVBUF, 1 << 17) //
- .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+ .option(ChannelOption.SO_RCVBUF, 1 << 17)
+ .option(ChannelOption.SO_SNDBUF, 1 << 17)
.group(eventLoopGroup) //
- .childOption(ChannelOption.ALLOCATOR, alloc) //
-// .handler(new LoggingHandler(LogLevel.INFO)) //
+ .childOption(ChannelOption.ALLOCATOR, alloc)
+
+ // .handler(new LoggingHandler(LogLevel.INFO))
+
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// logger.debug("Starting initialization of server connection.");
C connection = initRemoteConnection(ch);
- ch.closeFuture().addListener(getCloseHandler(connection));
-
- ch.pipeline().addLast( //
- getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), //
- new RpcDecoder("s-" + rpcConfig.getName()), //
- new RpcEncoder("s-" + rpcConfig.getName()), //
- getHandshakeHandler(connection), new InboundHandler(connection), //
- new RpcExceptionHandler() //
- );
+ ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+ final ChannelPipeline pipe = ch.pipeline();
+ pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
+ pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
+ pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
+ pipe.addLast("handshake-handler", getHandshakeHandler(connection));
+
+ if (rpcMapping.hasTimeout()) {
+ pipe.addLast(TIMEOUT_HANDLER,
+ new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+ }
+
+ pipe.addLast("message-handler", new InboundHandler(connection));
+ pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+
connect = true;
// logger.debug("Server connection initialization completed.");
}
@@ -88,10 +101,33 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
// }
}
+ private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+
+ private final String name;
+ private final int timeoutSeconds;
+ public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+ super(timeoutSeconds);
+ this.name = name;
+ this.timeoutSeconds = timeoutSeconds;
+ }
+
+ @Override
+ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+ logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name,
+ timeoutSeconds);
+ super.readTimedOut(ctx);
+ }
+
+ }
+
public OutOfMemoryHandler getOutOfMemoryHandler() {
return OutOfMemoryHandler.DEFAULT_INSTANCE;
}
+ protected void removeTimeoutHandler() {
+
+ }
+
public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
@Override
@@ -141,7 +177,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
}
@Override
- public C initRemoteConnection(Channel channel) {
+ public C initRemoteConnection(SocketChannel channel) {
return null;
}
@@ -152,7 +188,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
b.bind(++port).sync();
break;
} catch (Exception e) {
- if (e instanceof BindException && allowPortHunting){
+ if (e instanceof BindException && allowPortHunting) {
continue;
}
throw new DrillbitStartupException("Could not bind Drillbit", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 1bb65d3..5a5bbab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -86,7 +86,7 @@ public class CoordinationQueue {
if (future.channel().isActive()) {
throw new RpcException("Future failed") ;
} else {
- throw new ChannelClosedException();
+ setException(new ChannelClosedException());
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index a72dd32..0f095c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutionException;
@@ -30,16 +33,31 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
+ private final String name;
public boolean inEventLoop(){
return channel.eventLoop().inEventLoop();
}
- public RemoteConnection(Channel channel) {
+ public RemoteConnection(SocketChannel channel, String name) {
super();
this.channel = channel;
+ this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
this.writeManager = new WriteManager();
channel.pipeline().addLast(new BackPressureHandler());
+ channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
+ public void operationComplete(Future<? super Void> future) throws Exception {
+ // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
+ // fail out any pending messages
+ writeManager.disable();
+ writeManager.setWritable(true);
+ }
+ });
+
+ }
+
+ public String getName() {
+ return name;
}
public abstract BufferAllocator getAllocator();
@@ -72,6 +90,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
*/
private static class WriteManager{
private final ResettableBarrier barrier = new ResettableBarrier();
+ private volatile boolean disabled = false;
public WriteManager(){
barrier.openBarrier();
@@ -82,15 +101,17 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
}
public void setWritable(boolean isWritable){
-// logger.debug("Set writable: {}", isWritable);
if(isWritable){
barrier.openBarrier();
- }else{
+ } else if (!disabled) {
barrier.closeBarrier();
}
}
+ public void disable() {
+ disabled = true;
+ }
}
private class BackPressureHandler extends ChannelInboundHandlerAdapter{
@@ -107,7 +128,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
@Override
public void close() {
try {
- channel.close().get();
+ if (channel.isActive()) {
+ channel.close().get();
+ }
} catch (InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
// TODO InterruptedException
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b165b53..92ce312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,10 +23,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
@@ -125,25 +127,36 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
}
- public abstract C initRemoteConnection(Channel channel);
+ public abstract C initRemoteConnection(SocketChannel channel);
public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
+
+ final InetSocketAddress local;
+ final InetSocketAddress remote;
+ final C clientConnection;
+
+ public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
+ this.local = local;
+ this.remote = remote;
+ this.clientConnection = clientConnection;
+ }
+
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
- .remoteAddress());
- closeQueueDueToChannelClose();
- }
- }
+ String msg = String.format("Channel closed %s <--> %s.", local, remote);
+ if (RpcBus.this.isClient()) {
+ logger.info(String.format(msg));
+ } else {
+ queue.channelClosed(new ChannelClosedException(msg));
+ }
- protected void closeQueueDueToChannelClose() {
- if (this.isClient()) {
- queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure."));
+ clientConnection.close();
}
+
}
- protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
- return new ChannelClosedHandler();
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
+ return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
}
private class ResponseSenderImpl implements ResponseSender {
@@ -170,8 +183,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
+ private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
+
private final C connection;
public InboundHandler(C connection) {
super();
@@ -179,67 +195,84 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
@Override
- protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception {
+ protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
if (!ctx.channel().isOpen()) {
return;
}
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received message {}", msg);
}
- switch (msg.mode) {
- case REQUEST: {
- // handle message and ack.
- ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
- try {
- handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
- } catch(UserRpcException e){
- UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
-
- logger.error("Unexpected Error while handling request message", e);
-
- OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId,
- uex.getOrCreatePBError(false));
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Adding message to outbound buffer. {}", outMessage);
+ final Channel channel = connection.getChannel();
+
+ try{
+ switch (msg.mode) {
+ case REQUEST: {
+ // handle message and ack.
+
+ try {
+ ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
+ handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+ } catch (UserRpcException e) {
+ UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
+
+ logger.error("Unexpected Error while handling request message", e);
+
+ OutboundRpcMessage outMessage = new OutboundRpcMessage(
+ RpcMode.RESPONSE_FAILURE,
+ 0,
+ msg.coordinationId,
+ uex.getOrCreatePBError(false)
+ );
+
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Adding message to outbound buffer. {}", outMessage);
+ }
+
+ channel.writeAndFlush(outMessage);
}
- connection.getChannel().writeAndFlush(outMessage);
+ break;
}
- msg.release(); // we release our ownership. Handle could have taken over ownership.
- break;
- }
- case RESPONSE:
- try{
- MessageLite m = getResponseDefaultInstance(msg.rpcType);
- assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
- RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
- Parser<?> parser = m.getParserForType();
- Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- rpcFuture.set(value, msg.dBody);
- msg.release(); // we release our ownership. Handle could have taken over ownership.
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
- }
- }catch(Exception ex) {
- logger.error("Failure while handling response.", ex);
- throw ex;
- }
- break;
+ case RESPONSE:
+ try {
+ MessageLite m = getResponseDefaultInstance(msg.rpcType);
+ assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+ RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+ Parser<?> parser = m.getParserForType();
+ Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+ rpcFuture.set(value, msg.dBody);
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+ }
+ } catch (Exception ex) {
+ logger.error("Failure while handling response.", ex);
+ throw ex;
+ }
+ break;
- case RESPONSE_FAILURE:
- DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- queue.updateFailedFuture(msg.coordinationId, failure);
- msg.release();
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
- }
- break;
+ case RESPONSE_FAILURE:
+ DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+ queue.updateFailedFuture(msg.coordinationId, failure);
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+ }
+ break;
+
+ case PING:
+ connection.getChannel().writeAndFlush(PONG);
+ break;
+
+ case PONG:
+ // noop.
+ break;
- default:
- throw new UnsupportedOperationException();
+ default:
+ throw new UnsupportedOperationException();
+ }
+ } finally {
+ msg.release();
}
}
-
}
public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
index b5974f6..ab6c375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
import java.util.Map;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.Internal.EnumLite;
@@ -28,11 +29,14 @@ public class RpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
private final String name;
+ private final int timeout;
private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
- private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap) {
+ private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
+ Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout) {
this.name = name;
+ this.timeout = timeout;
this.sendMap = ImmutableMap.copyOf(sendMap);
this.receiveMap = ImmutableMap.copyOf(receiveMap);
}
@@ -41,6 +45,13 @@ public class RpcConfig {
return name;
}
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public boolean hasTimeout() {
+ return timeout > 0;
+ }
public boolean checkReceive(int rpcType, Class<?> receiveClass) {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
@@ -134,17 +145,27 @@ public class RpcConfig {
}
- public static RpcConfigBuilder newBuilder(String name) {
- return new RpcConfigBuilder(name);
+ public static RpcConfigBuilder newBuilder() {
+ return new RpcConfigBuilder();
}
public static class RpcConfigBuilder {
- private final String name;
+ private String name;
+ private int timeout = -1;
private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
- private RpcConfigBuilder(String name) {
+ private RpcConfigBuilder() {
+ }
+
+ public RpcConfigBuilder name(String name) {
this.name = name;
+ return this;
+ }
+
+ public RpcConfigBuilder timeout(int timeoutInSeconds) {
+ this.timeout = timeoutInSeconds;
+ return this;
}
public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite> RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
@@ -155,7 +176,9 @@ public class RpcConfig {
}
public RpcConfig build() {
- return new RpcConfig(name, sendMap, receiveMap);
+ Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
+ Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
+ return new RpcConfig(name, sendMap, receiveMap, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index 537452e..c12ff7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -23,23 +23,23 @@ import io.netty.channel.ChannelHandlerContext;
public class RpcExceptionHandler implements ChannelHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
- public RpcExceptionHandler(){
- }
+ private final String name;
+ public RpcExceptionHandler(String name) {
+ this.name = name;
+ }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
- logger.warn("Exception with closed channel", cause);
+ logger.warn("Exception occurred with closed channel. Connection: {}", name, cause);
return;
}else{
- logger.error("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+ logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause);
ctx.close();
}
}
-
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index d546db3..f191271 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -50,7 +50,8 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
private final BufferAllocator allocator;
public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
- super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
+ super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
this.localIdentity = localEndpoint;
this.remoteEndpoint = remoteEndpoint;
this.handler = handler;
@@ -64,14 +65,15 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@SuppressWarnings("unchecked")
@Override
- public ControlConnection initRemoteConnection(Channel channel) {
- this.connection = new ControlConnection(channel, (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+ public ControlConnection initRemoteConnection(SocketChannel channel) {
+ this.connection = new ControlConnection("control client", channel,
+ (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
return connection;
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection clientConnection) {
- return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection clientConnection) {
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a7aaa9c..49f0f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import java.util.UUID;
@@ -41,8 +41,9 @@ public class ControlConnection extends RemoteConnection {
private volatile boolean active = false;
private final UUID id;
- public ControlConnection(Channel channel, RpcBus<RpcType, ControlConnection> bus, BufferAllocator allocator) {
- super(channel);
+ public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
+ BufferAllocator allocator) {
+ super(channel, name);
this.bus = bus;
this.id = UUID.randomUUID();
this.allocator = allocator;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 37730e3..f92bb49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -17,7 +17,8 @@
*/
package org.apache.drill.exec.rpc.control;
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -34,15 +35,19 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class ControlRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
- .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
- .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("CONTROL")
+ .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
+ .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+ .build();
+ }
public static int RPC_VERSION = 3;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 43089d3..5e405ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.control;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -44,7 +44,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
private BufferAllocator allocator;
public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
- super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+ super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup());
this.handler = handler;
this.connectionRegistry = connectionRegistry;
this.allocator = context.getAllocator();
@@ -61,14 +62,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
return proxyCloseHandler;
}
@Override
- public ControlConnection initRemoteConnection(Channel channel) {
- return new ControlConnection(channel, this, allocator);
+ public ControlConnection initRemoteConnection(SocketChannel channel) {
+ return new ControlConnection("control server", channel, this, allocator);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
index 1d539a2..44c8ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.RemoteConnection;
@@ -26,8 +26,9 @@ public class BitServerConnection extends RemoteConnection{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
private final BufferAllocator allocator;
- public BitServerConnection(Channel channel, BufferAllocator allocator) {
- super(channel);
+
+ public BitServerConnection(SocketChannel channel, BufferAllocator allocator) {
+ super(channel, "data server");
this.allocator = allocator;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 8e2507b..b8a07c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,8 +18,8 @@
package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -47,21 +47,22 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
- super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
+ super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
this.remoteEndpoint = remoteEndpoint;
this.closeHandlerFactory = closeHandlerFactory;
this.allocator = context.getAllocator();
}
@Override
- public DataClientConnection initRemoteConnection(Channel channel) {
+ public DataClientConnection initRemoteConnection(SocketChannel channel) {
this.connection = new DataClientConnection(channel, this);
return connection;
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(DataClientConnection clientConnection) {
- return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 3a569db..eb5778d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
import java.util.UUID;
@@ -36,8 +36,8 @@ public class DataClientConnection extends RemoteConnection{
private final DataClient client;
private final UUID id;
- public DataClientConnection(Channel channel, DataClient client) {
- super(channel);
+ public DataClientConnection(SocketChannel channel, DataClient client) {
+ super(channel, "data client");
this.client = client;
// we use a local listener pool unless a global one is provided.
this.id = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 807b6c3..c5cf498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -17,7 +17,8 @@
*/
package org.apache.drill.exec.rpc.data;
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
@@ -30,10 +31,14 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class DataRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-DATA-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
- .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("DATA")
+ .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
+ .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+ .build();
+ }
public static int RPC_VERSION = 4;
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 6f8e20b..0d4077e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc.data;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
@@ -57,7 +57,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
private final DataResponseHandler dataHandler;
public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
- super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+ super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+ .getBitLoopGroup());
this.context = context;
this.workBus = workBus;
this.dataHandler = dataHandler;
@@ -69,13 +70,13 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(BitServerConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
return proxyCloseHandler;
}
@Override
- public BitServerConnection initRemoteConnection(Channel channel) {
+ public BitServerConnection initRemoteConnection(SocketChannel channel) {
return new BitServerConnection(channel, context.getAllocator());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5e3e937..302be72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,18 +19,22 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
@@ -63,8 +67,9 @@ public class QueryResultHandler {
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
Maps.newConcurrentMap();
- public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
- return new SubmissionListener(resultsListener);
+ public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
+ UserResultsListener resultsListener) {
+ return new SubmissionListener(connection, resultsListener);
}
/**
@@ -268,12 +273,31 @@ public class QueryResultHandler {
}
+
private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
- private UserResultsListener resultsListener;
+ private final UserResultsListener resultsListener;
+ private final RemoteConnection connection;
+ private final ChannelFuture closeFuture;
+ private final ChannelClosedListener closeListener;
- public SubmissionListener(UserResultsListener resultsListener) {
+ public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
super();
this.resultsListener = resultsListener;
+ this.connection = connection;
+ this.closeFuture = connection.getChannel().closeFuture();
+ this.closeListener = new ChannelClosedListener();
+ closeFuture.addListener(closeListener);
+ }
+
+ private class ChannelClosedListener implements GenericFutureListener<Future<Void>> {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ resultsListener.submissionFailed(UserException.connectionError()
+ .message("Connection %s closed unexpectedly.", connection.getName())
+ .build());
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a8bad78..b39a103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack.Builder;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,13 +51,21 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
private boolean supportComplexTypes = true;
- public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+ public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
+ EventLoopGroup eventLoopGroup) {
+ super(
+ UserRpcConfig.getMapping(config),
+ alloc,
+ eventLoopGroup,
+ RpcType.HANDSHAKE,
+ BitToUserHandshake.class,
+ BitToUserHandshake.PARSER,
+ "user client");
this.supportComplexTypes = supportComplexTypes;
}
public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
- send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+ send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
@@ -66,6 +74,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
.setSupportComplexTypes(supportComplexTypes)
+ .setSupportTimeout(true)
.setCredentials(credentials);
if (props != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 88592d4..ae728d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,10 +17,12 @@
*/
package org.apache.drill.exec.rpc.user;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -30,13 +32,17 @@ import org.apache.drill.exec.rpc.RpcConfig;
public class UserRpcConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
- public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
- .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
- .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
- .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
- .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
- .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
- .build();
+ public static RpcConfig getMapping(DrillConfig config) {
+ return RpcConfig.newBuilder()
+ .name("USER")
+ .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
+ .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+ .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
+ .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
+ .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
+ .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+ .build();
+ }
public static int RPC_VERSION = 5;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9e929de..b3b7ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,12 +17,11 @@
*/
package org.apache.drill.exec.rpc.user;
-import com.google.common.io.Closeables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.util.UUID;
@@ -56,6 +55,7 @@ import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
import org.apache.drill.exec.work.user.UserWorker;
+import com.google.common.io.Closeables;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
@@ -68,7 +68,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public UserServer(DrillConfig config, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
UserWorker worker) throws DrillbitStartupException {
- super(UserRpcConfig.MAPPING, alloc.getUnderlyingAllocator(), eventLoopGroup);
+ super(UserRpcConfig.getMapping(config), alloc.getUnderlyingAllocator(), eventLoopGroup);
this.worker = worker;
this.alloc = alloc;
if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
@@ -123,8 +123,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
private UserSession session;
- public UserClientConnection(Channel channel) {
- super(channel);
+ public UserClientConnection(SocketChannel channel) {
+ super(channel, "user client");
+ }
+
+ void disableReadTimeout() {
+ getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
}
void setUser(UserToBitHandshake inbound) throws IOException {
@@ -161,7 +165,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- public UserClientConnection initRemoteConnection(Channel channel) {
+ public UserClientConnection initRemoteConnection(SocketChannel channel) {
return new UserClientConnection(channel);
}
@@ -186,6 +190,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public BitToUserHandshake getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
logger.trace("Handling handshake from user to bit. {}", inbound);
+
+ // if timeout is unsupported or is set to false, disable timeout.
+ if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
+ connection.disableReadTimeout();
+ logger.warn("Timeout Disabled as client doesn't support it.", connection.getName());
+ }
+
BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION);
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 49d0c94..b7ef584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Collection;
@@ -121,6 +124,9 @@ public class Foreman implements Runnable {
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
+ private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
+ private final ChannelFuture closeFuture;
+
/**
* Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -139,6 +145,9 @@ public class Foreman implements Runnable {
this.drillbitContext = drillbitContext;
initiatingClient = connection;
+ this.closeFuture = initiatingClient.getChannel().closeFuture();
+ closeFuture.addListener(closeListener);
+
queryContext = new QueryContext(connection.getSession(), drillbitContext);
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
@@ -146,6 +155,13 @@ public class Foreman implements Runnable {
recordNewState(QueryState.PENDING);
}
+ private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ cancel();
+ }
+ }
+
/**
* Get the QueryContext created for the query.
*
@@ -603,6 +619,9 @@ public class Foreman implements Runnable {
logger.info("foreman cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
+ // remove the channel disconnected listener (doesn't throw)
+ closeFuture.removeListener(closeListener);
+
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8006533..d98b97a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -27,6 +27,7 @@ drill.exec: {
cluster-id: "drillbits1"
rpc: {
user: {
+ timeout: 30,
server: {
port: 31010
threads: 1
@@ -36,6 +37,7 @@ drill.exec: {
}
},
bit: {
+ timeout: 30,
server: {
port : 31011,
retry:{
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
index f47e719..c28cc29 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
@@ -42,6 +42,14 @@ public final class GeneralRPCProtos {
* <code>RESPONSE_FAILURE = 2;</code>
*/
RESPONSE_FAILURE(2, 2),
+ /**
+ * <code>PING = 3;</code>
+ */
+ PING(3, 3),
+ /**
+ * <code>PONG = 4;</code>
+ */
+ PONG(4, 4),
;
/**
@@ -56,6 +64,14 @@ public final class GeneralRPCProtos {
* <code>RESPONSE_FAILURE = 2;</code>
*/
public static final int RESPONSE_FAILURE_VALUE = 2;
+ /**
+ * <code>PING = 3;</code>
+ */
+ public static final int PING_VALUE = 3;
+ /**
+ * <code>PONG = 4;</code>
+ */
+ public static final int PONG_VALUE = 4;
public final int getNumber() { return value; }
@@ -65,6 +81,8 @@ public final class GeneralRPCProtos {
case 0: return REQUEST;
case 1: return RESPONSE;
case 2: return RESPONSE_FAILURE;
+ case 3: return PING;
+ case 4: return PONG;
default: return null;
}
}
@@ -1972,10 +1990,10 @@ public final class GeneralRPCProtos {
"rdination_id\030\002 \001(\005\022\020\n\010rpc_type\030\003 \001(\005\"b\n\022" +
"CompleteRpcMessage\022#\n\006header\030\001 \001(\0132\023.exe" +
"c.rpc.RpcHeader\022\025\n\rprotobuf_body\030\002 \001(\014\022\020" +
- "\n\010raw_body\030\003 \001(\014*:\n\007RpcMode\022\013\n\007REQUEST\020\000" +
- "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002B1\n\033" +
- "org.apache.drill.exec.protoB\020GeneralRPCP" +
- "rotosH\001"
+ "\n\010raw_body\030\003 \001(\014*N\n\007RpcMode\022\013\n\007REQUEST\020\000" +
+ "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002\022\010\n\004" +
+ "PING\020\003\022\010\n\004PONG\020\004B1\n\033org.apache.drill.exe" +
+ "c.protoB\020GeneralRPCProtosH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index d587dfc..6fc43bb 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -280,6 +280,8 @@ public final class SchemaUserProtos
if(message.hasSupportComplexTypes())
output.writeBool(6, message.getSupportComplexTypes(), false);
+ if(message.hasSupportTimeout())
+ output.writeBool(7, message.getSupportTimeout(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message)
{
@@ -339,6 +341,9 @@ public final class SchemaUserProtos
case 6:
builder.setSupportComplexTypes(input.readBool());
break;
+ case 7:
+ builder.setSupportTimeout(input.readBool());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -385,6 +390,7 @@ public final class SchemaUserProtos
case 4: return "credentials";
case 5: return "properties";
case 6: return "supportComplexTypes";
+ case 7: return "supportTimeout";
default: return null;
}
}
@@ -402,6 +408,7 @@ public final class SchemaUserProtos
fieldMap.put("credentials", 4);
fieldMap.put("properties", 5);
fieldMap.put("supportComplexTypes", 6);
+ fieldMap.put("supportTimeout", 7);
}
}