You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/09 17:51:48 UTC

[2/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.

DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure.  Add RPC timeout.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/960f876a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/960f876a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/960f876a

Branch: refs/heads/master
Commit: 960f876a1945eee4eeb0d6a98c5c58dfe2eea1a9
Parents: d4f9bf2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri May 8 08:15:36 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 07:12:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../apache/drill/exec/client/DrillClient.java   |   8 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  | 110 ++++++++++----
 .../exec/rpc/BasicClientWithConnection.java     |  21 +--
 .../org/apache/drill/exec/rpc/BasicServer.java  |  78 +++++++---
 .../drill/exec/rpc/CoordinationQueue.java       |   2 +-
 .../apache/drill/exec/rpc/RemoteConnection.java |  31 +++-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 151 +++++++++++--------
 .../org/apache/drill/exec/rpc/RpcConfig.java    |  35 ++++-
 .../drill/exec/rpc/RpcExceptionHandler.java     |  12 +-
 .../drill/exec/rpc/control/ControlClient.java   |  14 +-
 .../exec/rpc/control/ControlConnection.java     |   7 +-
 .../exec/rpc/control/ControlRpcConfig.java      |  25 +--
 .../drill/exec/rpc/control/ControlServer.java   |  13 +-
 .../exec/rpc/data/BitServerConnection.java      |   7 +-
 .../apache/drill/exec/rpc/data/DataClient.java  |  11 +-
 .../exec/rpc/data/DataClientConnection.java     |   6 +-
 .../drill/exec/rpc/data/DataRpcConfig.java      |  15 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |  11 +-
 .../drill/exec/rpc/user/QueryResultHandler.java |  36 ++++-
 .../apache/drill/exec/rpc/user/UserClient.java  |  19 ++-
 .../drill/exec/rpc/user/UserRpcConfig.java      |  22 ++-
 .../apache/drill/exec/rpc/user/UserServer.java  |  23 ++-
 .../apache/drill/exec/work/foreman/Foreman.java |  19 +++
 .../src/main/resources/drill-module.conf        |   2 +
 .../drill/exec/proto/GeneralRPCProtos.java      |  26 +++-
 .../drill/exec/proto/SchemaUserProtos.java      |   7 +
 .../org/apache/drill/exec/proto/UserProtos.java | 123 ++++++++++++---
 .../apache/drill/exec/proto/beans/RpcMode.java  |   6 +-
 .../exec/proto/beans/UserToBitHandshake.java    |  23 +++
 protocol/src/main/protobuf/GeneralRPC.proto     |   2 +
 protocol/src/main/protobuf/User.proto           |   1 +
 32 files changed, 626 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be8c7a0..97d5770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -42,7 +42,9 @@ public interface ExecConstants {
   public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
   public static final String SERVICE_NAME = "drill.exec.cluster-id";
   public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+  public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
   public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
+  public static final String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";
   public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
   public static final String FUNCTION_PACKAGES = "drill.exec.functions";
   public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 9924704..136d8c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -20,20 +20,17 @@ package org.apache.drill.exec.client;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
-
-import com.google.common.base.Strings;
 import io.netty.buffer.DrillBuf;
+import io.netty.channel.EventLoopGroup;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Vector;
 
-import io.netty.channel.EventLoopGroup;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -63,6 +60,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -208,7 +206,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
-    client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
+    client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup);
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
     connect(endpoint);
     connected = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 72ae130..1661f81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -20,18 +20,24 @@ package org.apache.drill.exec.rpc;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 
 import com.google.protobuf.Internal.EnumLite;
@@ -40,7 +46,12 @@ import com.google.protobuf.Parser;
 
 public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
     extends RpcBus<T, R> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+
+  // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
+  // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
+  // seconds, the idle state handler will send a ping message.
+  private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
 
   private final Bootstrap b;
   protected R connection;
@@ -48,18 +59,23 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
   private final Class<HANDSHAKE_RESPONSE> responseClass;
   private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
 
+  private final IdlePingHandler pingHandler;
+
   public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
       Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
     super(rpcMapping);
     this.responseClass = responseClass;
     this.handshakeType = handshakeType;
     this.handshakeParser = handshakeParser;
+    final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
+        : -1;
+    this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
 
     b = new Bootstrap() //
         .group(eventLoopGroup) //
         .channel(TransportCheck.getClientSocketChannel()) //
         .option(ChannelOption.ALLOCATOR, alloc) //
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.SO_RCVBUF, 1 << 17) //
         .option(ChannelOption.SO_SNDBUF, 1 << 17) //
@@ -68,40 +84,72 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
-//            logger.debug("initializing client connection.");
+            // logger.debug("initializing client connection.");
             connection = initRemoteConnection(ch);
-            ch.closeFuture().addListener(getCloseHandler(connection));
-
-            ch.pipeline().addLast( //
-                getDecoder(connection.getAllocator()), //
-                new RpcDecoder("c-" + rpcConfig.getName()), //
-                new RpcEncoder("c-" + rpcConfig.getName()), //
-                new ClientHandshakeHandler(), //
-                new InboundHandler(connection), //
-                new RpcExceptionHandler() //
-                );
+
+            ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+            final ChannelPipeline pipe = ch.pipeline();
+
+            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
+            pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
+            pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
+            pipe.addLast("handshake-handler", new ClientHandshakeHandler());
+
+            if(pingHandler != null){
+              pipe.addLast("idle-state-handler", pingHandler);
+            }
+
+            pipe.addLast("message-handler", new InboundHandler(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
           }
         }); //
 
-//    if(TransportCheck.SUPPORTS_EPOLL){
-//      b.option(EpollChannelOption.SO_REUSEPORT, true); //
-//    }
+    // if(TransportCheck.SUPPORTS_EPOLL){
+    // b.option(EpollChannelOption.SO_REUSEPORT, true); //
+    // }
+  }
+
+  private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+
+  /**
+   * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
+   * timeout, we send a PING message to the server to state that we are still alive.
+   */
+  private class IdlePingHandler extends IdleStateHandler {
+
+    private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
+      public void operationComplete(Future<? super Void> future) throws Exception {
+        if (!future.isSuccess()) {
+          logger.error("Unable to maintain connection {}.  Closing connection.", connection.getName());
+          connection.close();
+        }
+      }
+    };
+
+    public IdlePingHandler(long idleWaitInMillis) {
+      super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
+      if (evt.state() == IdleState.WRITER_IDLE) {
+        ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
+      }
+    }
   }
 
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
 
-  public boolean isActive(){
+  public boolean isActive() {
     return connection != null
         && connection.getChannel() != null
-        && connection.getChannel().isActive() ;
+        && connection.getChannel().isActive();
   }
 
   protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
-  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
 
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
-    return new ChannelClosedHandler();
-  }
+  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
 
   public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
       T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
@@ -118,7 +166,8 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return true;
   }
 
-  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
+      String host, int port) {
     ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
     b.connect(host, port).addListener(cml.connectionHandler);
   }
@@ -145,16 +194,17 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
-//        logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
+        // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
         try {
           future.get();
           if (future.isSuccess()) {
-            // send a handshake on the current thread.  This is the only time we will send from within the event thread.  We can do this because the connection will not be backed up.
+            // send a handshake on the current thread. This is the only time we will send from within the event thread.
+            // We can do this because the connection will not be backed up.
             send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
           } else {
             l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
           }
-//          logger.debug("Handshake queued for send.");
+          // logger.debug("Handshake queued for send.");
         } catch (Exception ex) {
           l.connectionFailed(FailureType.CONNECTION, ex);
         }
@@ -174,12 +224,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
-//        logger.debug("Handshake received. {}", value);
+        // logger.debug("Handshake received. {}", value);
         try {
           BasicClient.this.validateHandshake(value);
           BasicClient.this.finalizeConnection(value, connection);
           l.connectionSucceeded(connection);
-//          logger.debug("Handshake completed succesfully.");
+          // logger.debug("Handshake completed succesfully.");
         } catch (RpcException ex) {
           l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
         }
@@ -205,7 +255,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
   }
 
-  public void setAutoRead(boolean enableAutoRead){
+  public void setAutoRead(boolean enableAutoRead) {
     connection.setAutoRead(enableAutoRead);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 08819ca..ab54fa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -18,10 +18,8 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.channel.socket.SocketChannel;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -35,16 +33,13 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
 
   private BufferAllocator alloc;
+  private final String connectionName;
 
   public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
     super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
     this.alloc = alloc;
-  }
-
-  @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
-    return getCloseHandler(clientConnection.getChannel());
+    this.connectionName = connectionName;
   }
 
   @Override
@@ -55,16 +50,16 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
   protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
 
   @Override
-  public ServerConnection initRemoteConnection(Channel channel) {
-    return new ServerConnection(channel, alloc);
+  public ServerConnection initRemoteConnection(SocketChannel channel) {
+    return new ServerConnection(connectionName, channel, alloc);
   }
 
   public static class ServerConnection extends RemoteConnection{
 
     private final BufferAllocator alloc;
 
-    public ServerConnection(Channel channel, BufferAllocator alloc) {
-      super(channel);
+    public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
+      super(channel, name);
       this.alloc = alloc;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 3a7032b..a148436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -20,12 +20,13 @@ package org.apache.drill.exec.rpc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 
 import java.io.IOException;
 import java.net.BindException;
@@ -44,40 +45,52 @@ import com.google.protobuf.Parser;
  * requests will generate more than one outbound request.
  */
 public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+  protected static final String TIMEOUT_HANDLER = "timeout-handler";
 
   private ServerBootstrap b;
   private volatile boolean connect = false;
   private final EventLoopGroup eventLoopGroup;
 
-  public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
     super(rpcMapping);
     this.eventLoopGroup = eventLoopGroup;
-    b = new ServerBootstrap() //
-        .channel(TransportCheck.getServerSocketChannel()) //
-        .option(ChannelOption.SO_BACKLOG, 1000) //
+
+    b = new ServerBootstrap()
+        .channel(TransportCheck.getServerSocketChannel())
+        .option(ChannelOption.SO_BACKLOG, 1000)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
         .option(ChannelOption.TCP_NODELAY, true)
         .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
-        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+        .option(ChannelOption.SO_RCVBUF, 1 << 17)
+        .option(ChannelOption.SO_SNDBUF, 1 << 17)
         .group(eventLoopGroup) //
-        .childOption(ChannelOption.ALLOCATOR, alloc) //
-//        .handler(new LoggingHandler(LogLevel.INFO)) //
+        .childOption(ChannelOption.ALLOCATOR, alloc)
+
+        // .handler(new LoggingHandler(LogLevel.INFO))
+
         .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
 //            logger.debug("Starting initialization of server connection.");
             C connection = initRemoteConnection(ch);
-            ch.closeFuture().addListener(getCloseHandler(connection));
-
-            ch.pipeline().addLast( //
-                getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), //
-                new RpcDecoder("s-" + rpcConfig.getName()), //
-                new RpcEncoder("s-" + rpcConfig.getName()), //
-                getHandshakeHandler(connection), new InboundHandler(connection), //
-                new RpcExceptionHandler() //
-                );
+            ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+            final ChannelPipeline pipe = ch.pipeline();
+            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
+            pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
+            pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
+            pipe.addLast("handshake-handler", getHandshakeHandler(connection));
+
+            if (rpcMapping.hasTimeout()) {
+              pipe.addLast(TIMEOUT_HANDLER,
+                  new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+            }
+
+            pipe.addLast("message-handler", new InboundHandler(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+
             connect = true;
 //            logger.debug("Server connection initialization completed.");
           }
@@ -88,10 +101,33 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 //     }
   }
 
+  private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+
+    private final String name;
+    private final int timeoutSeconds;
+    public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+      super(timeoutSeconds);
+      this.name = name;
+      this.timeoutSeconds = timeoutSeconds;
+    }
+
+    @Override
+    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", name,
+          timeoutSeconds);
+      super.readTimedOut(ctx);
+    }
+
+  }
+
   public OutOfMemoryHandler getOutOfMemoryHandler() {
     return OutOfMemoryHandler.DEFAULT_INSTANCE;
   }
 
+  protected void removeTimeoutHandler() {
+
+  }
+
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
 
   @Override
@@ -141,7 +177,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   }
 
   @Override
-  public C initRemoteConnection(Channel channel) {
+  public C initRemoteConnection(SocketChannel channel) {
     return null;
   }
 
@@ -152,7 +188,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
         b.bind(++port).sync();
         break;
       } catch (Exception e) {
-        if (e instanceof BindException && allowPortHunting){
+        if (e instanceof BindException && allowPortHunting) {
           continue;
         }
         throw new DrillbitStartupException("Could not bind Drillbit", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 1bb65d3..5a5bbab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -86,7 +86,7 @@ public class CoordinationQueue {
         if (future.channel().isActive()) {
            throw new RpcException("Future failed") ;
         } else {
-          throw  new ChannelClosedException();
+          setException(new ChannelClosedException());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index a72dd32..0f095c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.rpc;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ExecutionException;
 
@@ -30,16 +33,31 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
+  private final String name;
 
   public boolean inEventLoop(){
     return channel.eventLoop().inEventLoop();
   }
 
-  public RemoteConnection(Channel channel) {
+  public RemoteConnection(SocketChannel channel, String name) {
     super();
     this.channel = channel;
+    this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
+    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
+      public void operationComplete(Future<? super Void> future) throws Exception {
+        // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
+        // fail out any pending messages
+        writeManager.disable();
+        writeManager.setWritable(true);
+      }
+    });
+
+  }
+
+  public String getName() {
+    return name;
   }
 
   public abstract BufferAllocator getAllocator();
@@ -72,6 +90,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
    */
   private static class WriteManager{
     private final ResettableBarrier barrier = new ResettableBarrier();
+    private volatile boolean disabled = false;
 
     public WriteManager(){
       barrier.openBarrier();
@@ -82,15 +101,17 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
     }
 
     public void setWritable(boolean isWritable){
-//      logger.debug("Set writable: {}", isWritable);
       if(isWritable){
         barrier.openBarrier();
-      }else{
+      } else if (!disabled) {
         barrier.closeBarrier();
       }
 
     }
 
+    public void disable() {
+      disabled = true;
+    }
   }
 
   private class BackPressureHandler extends ChannelInboundHandlerAdapter{
@@ -107,7 +128,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   @Override
   public void close() {
     try {
-      channel.close().get();
+      if (channel.isActive()) {
+        channel.close().get();
+      }
     } catch (InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
       // TODO InterruptedException

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b165b53..92ce312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,10 +23,12 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.List;
 
@@ -125,25 +127,36 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
   }
 
-  public abstract C initRemoteConnection(Channel channel);
+  public abstract C initRemoteConnection(SocketChannel channel);
 
   public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
+
+    final InetSocketAddress local;
+    final InetSocketAddress remote;
+    final C clientConnection;
+
+    public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
+      this.local = local;
+      this.remote = remote;
+      this.clientConnection = clientConnection;
+    }
+
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
-          .remoteAddress());
-      closeQueueDueToChannelClose();
-    }
-  }
+      String msg = String.format("Channel closed %s <--> %s.", local, remote);
+      if (RpcBus.this.isClient()) {
+        logger.info(String.format(msg));
+      } else {
+        queue.channelClosed(new ChannelClosedException(msg));
+      }
 
-  protected void closeQueueDueToChannelClose() {
-    if (this.isClient()) {
-      queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure."));
+      clientConnection.close();
     }
+
   }
 
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
-    return new ChannelClosedHandler();
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
+    return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
   }
 
   private class ResponseSenderImpl implements ResponseSender {
@@ -170,8 +183,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   }
 
+  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
   protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
 
+
     private final C connection;
     public InboundHandler(C connection) {
       super();
@@ -179,67 +195,84 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
 
     @Override
-    protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception {
+    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
       if (!ctx.channel().isOpen()) {
         return;
       }
       if (RpcConstants.EXTRA_DEBUGGING) {
         logger.debug("Received message {}", msg);
       }
-      switch (msg.mode) {
-      case REQUEST: {
-        // handle message and ack.
-        ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
-        try {
-          handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
-        } catch(UserRpcException e){
-          UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
-
-          logger.error("Unexpected Error while handling request message", e);
-
-          OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId,
-            uex.getOrCreatePBError(false));
-          if (RpcConstants.EXTRA_DEBUGGING) {
-            logger.debug("Adding message to outbound buffer. {}", outMessage);
+      final Channel channel = connection.getChannel();
+
+      try{
+        switch (msg.mode) {
+        case REQUEST: {
+          // handle message and ack.
+
+          try {
+            ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
+            handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+          } catch (UserRpcException e) {
+            UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
+
+            logger.error("Unexpected Error while handling request message", e);
+
+            OutboundRpcMessage outMessage = new OutboundRpcMessage(
+                RpcMode.RESPONSE_FAILURE,
+                0,
+                msg.coordinationId,
+                uex.getOrCreatePBError(false)
+                );
+
+            if (RpcConstants.EXTRA_DEBUGGING) {
+              logger.debug("Adding message to outbound buffer. {}", outMessage);
+            }
+
+            channel.writeAndFlush(outMessage);
           }
-          connection.getChannel().writeAndFlush(outMessage);
+          break;
         }
-        msg.release();  // we release our ownership.  Handle could have taken over ownership.
-        break;
-      }
 
-      case RESPONSE:
-        try{
-        MessageLite m = getResponseDefaultInstance(msg.rpcType);
-        assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
-        RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
-        Parser<?> parser = m.getParserForType();
-        Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        rpcFuture.set(value, msg.dBody);
-        msg.release();  // we release our ownership.  Handle could have taken over ownership.
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-        }
-        }catch(Exception ex) {
-          logger.error("Failure while handling response.", ex);
-          throw ex;
-        }
-        break;
+        case RESPONSE:
+          try {
+            MessageLite m = getResponseDefaultInstance(msg.rpcType);
+            assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+            RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+            Parser<?> parser = m.getParserForType();
+            Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+            rpcFuture.set(value, msg.dBody);
+            if (RpcConstants.EXTRA_DEBUGGING) {
+              logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+            }
+          } catch (Exception ex) {
+            logger.error("Failure while handling response.", ex);
+            throw ex;
+          }
+          break;
 
-      case RESPONSE_FAILURE:
-        DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        queue.updateFailedFuture(msg.coordinationId, failure);
-        msg.release();
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
-        }
-        break;
+        case RESPONSE_FAILURE:
+          DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+          queue.updateFailedFuture(msg.coordinationId, failure);
+          if (RpcConstants.EXTRA_DEBUGGING) {
+            logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+          }
+          break;
+
+        case PING:
+          connection.getChannel().writeAndFlush(PONG);
+          break;
+
+        case PONG:
+          // noop.
+          break;
 
-      default:
-        throw new UnsupportedOperationException();
+        default:
+          throw new UnsupportedOperationException();
+        }
+      } finally {
+        msg.release();
       }
     }
-
   }
 
   public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
index b5974f6..ab6c375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.protobuf.Internal.EnumLite;
@@ -28,11 +29,14 @@ public class RpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
 
   private final String name;
+  private final int timeout;
   private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
   private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
 
-  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap) {
+  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
+      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout) {
     this.name = name;
+    this.timeout = timeout;
     this.sendMap = ImmutableMap.copyOf(sendMap);
     this.receiveMap = ImmutableMap.copyOf(receiveMap);
   }
@@ -41,6 +45,13 @@ public class RpcConfig {
     return name;
   }
 
+  public int getTimeout() {
+    return timeout;
+  }
+
+  public boolean hasTimeout() {
+    return timeout > 0;
+  }
   public boolean checkReceive(int rpcType, Class<?> receiveClass) {
     if (RpcConstants.EXTRA_DEBUGGING) {
       logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
@@ -134,17 +145,27 @@ public class RpcConfig {
 
   }
 
-  public static RpcConfigBuilder newBuilder(String name) {
-    return new RpcConfigBuilder(name);
+  public static RpcConfigBuilder newBuilder() {
+    return new RpcConfigBuilder();
   }
 
   public static class RpcConfigBuilder {
-    private final String name;
+    private String name;
+    private int timeout = -1;
     private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
     private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
 
-    private RpcConfigBuilder(String name) {
+    private RpcConfigBuilder() {
+    }
+
+    public RpcConfigBuilder name(String name) {
       this.name = name;
+      return this;
+    }
+
+    public RpcConfigBuilder timeout(int timeoutInSeconds) {
+      this.timeout = timeoutInSeconds;
+      return this;
     }
 
     public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>  RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
@@ -155,7 +176,9 @@ public class RpcConfig {
     }
 
     public RpcConfig build() {
-      return new RpcConfig(name, sendMap, receiveMap);
+      Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
+      Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
+      return new RpcConfig(name, sendMap, receiveMap, timeout);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index 537452e..c12ff7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -23,23 +23,23 @@ import io.netty.channel.ChannelHandlerContext;
 public class RpcExceptionHandler implements ChannelHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
 
-  public RpcExceptionHandler(){
-  }
+  private final String name;
 
+  public RpcExceptionHandler(String name) {
+    this.name = name;
+  }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
     if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
-      logger.warn("Exception with closed channel", cause);
+      logger.warn("Exception occurred with closed channel.  Connection: {}", name, cause);
       return;
     }else{
-      logger.error("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", name, cause);
       ctx.close();
     }
   }
 
-
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index d546db3..f191271 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -50,7 +50,8 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   private final BufferAllocator allocator;
 
   public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
+    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
     this.localIdentity = localEndpoint;
     this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;
@@ -64,14 +65,15 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
 
   @SuppressWarnings("unchecked")
   @Override
-  public ControlConnection initRemoteConnection(Channel channel) {
-    this.connection = new ControlConnection(channel, (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+  public ControlConnection initRemoteConnection(SocketChannel channel) {
+    this.connection = new ControlConnection("control client", channel,
+        (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
     return connection;
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection clientConnection) {
-    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection clientConnection) {
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a7aaa9c..49f0f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import java.util.UUID;
 
@@ -41,8 +41,9 @@ public class ControlConnection extends RemoteConnection {
   private volatile boolean active = false;
   private final UUID id;
 
-  public ControlConnection(Channel channel, RpcBus<RpcType, ControlConnection> bus, BufferAllocator allocator) {
-    super(channel);
+  public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
+      BufferAllocator allocator) {
+    super(channel, name);
     this.bus = bus;
     this.id = UUID.randomUUID();
     this.allocator = allocator;

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 37730e3..f92bb49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -17,7 +17,8 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -34,15 +35,19 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class ControlRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
-      .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("CONTROL")
+        .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
+        .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+        .build();
+  }
 
   public static int RPC_VERSION = 3;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 43089d3..5e405ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -44,7 +44,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   private BufferAllocator allocator;
 
   public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
-    super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup());
     this.handler = handler;
     this.connectionRegistry = connectionRegistry;
     this.allocator = context.getAllocator();
@@ -61,14 +62,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection connection) {
-    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
     return proxyCloseHandler;
   }
 
   @Override
-  public ControlConnection initRemoteConnection(Channel channel) {
-    return new ControlConnection(channel, this, allocator);
+  public ControlConnection initRemoteConnection(SocketChannel channel) {
+    return new ControlConnection("control server", channel, this, allocator);
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
index 1d539a2..44c8ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.RemoteConnection;
@@ -26,8 +26,9 @@ public class BitServerConnection extends RemoteConnection{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
 
   private final BufferAllocator allocator;
-  public BitServerConnection(Channel channel, BufferAllocator allocator) {
-    super(channel);
+
+  public BitServerConnection(SocketChannel channel, BufferAllocator allocator) {
+    super(channel, "data server");
     this.allocator = allocator;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 8e2507b..b8a07c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -47,21 +47,22 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
 
 
   public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
+    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
     this.remoteEndpoint = remoteEndpoint;
     this.closeHandlerFactory = closeHandlerFactory;
     this.allocator = context.getAllocator();
   }
 
   @Override
-  public DataClientConnection initRemoteConnection(Channel channel) {
+  public DataClientConnection initRemoteConnection(SocketChannel channel) {
     this.connection = new DataClientConnection(channel, this);
     return connection;
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(DataClientConnection clientConnection) {
-    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 3a569db..eb5778d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import java.util.UUID;
 
@@ -36,8 +36,8 @@ public class DataClientConnection extends RemoteConnection{
   private final DataClient client;
   private final UUID id;
 
-  public DataClientConnection(Channel channel, DataClient client) {
-    super(channel);
+  public DataClientConnection(SocketChannel channel, DataClient client) {
+    super(channel, "data client");
     this.client = client;
     // we use a local listener pool unless a global one is provided.
     this.id = UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 807b6c3..c5cf498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -17,7 +17,8 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.BitData.BitServerHandshake;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
@@ -30,10 +31,14 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class DataRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-DATA-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
-      .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("DATA")
+        .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
+        .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+        .build();
+  }
 
   public static int RPC_VERSION = 4;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 6f8e20b..0d4077e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
@@ -57,7 +57,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   private final DataResponseHandler dataHandler;
 
   public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
-    super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup());
     this.context = context;
     this.workBus = workBus;
     this.dataHandler = dataHandler;
@@ -69,13 +70,13 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(BitServerConnection connection) {
-    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
     return proxyCloseHandler;
   }
 
   @Override
-  public BitServerConnection initRemoteConnection(Channel channel) {
+  public BitServerConnection initRemoteConnection(SocketChannel channel) {
     return new BitServerConnection(channel, context.getAllocator());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5e3e937..302be72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,18 +19,22 @@ package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
@@ -63,8 +67,9 @@ public class QueryResultHandler {
   private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
       Maps.newConcurrentMap();
 
-  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
-    return new SubmissionListener(resultsListener);
+  public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
+      UserResultsListener resultsListener) {
+    return new SubmissionListener(connection, resultsListener);
   }
 
   /**
@@ -268,12 +273,31 @@ public class QueryResultHandler {
 
   }
 
+
   private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
-    private UserResultsListener resultsListener;
+    private final UserResultsListener resultsListener;
+    private final RemoteConnection connection;
+    private final ChannelFuture closeFuture;
+    private final ChannelClosedListener closeListener;
 
-    public SubmissionListener(UserResultsListener resultsListener) {
+    public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
       super();
       this.resultsListener = resultsListener;
+      this.connection = connection;
+      this.closeFuture = connection.getChannel().closeFuture();
+      this.closeListener = new ChannelClosedListener();
+      closeFuture.addListener(closeListener);
+    }
+
+    private class ChannelClosedListener implements GenericFutureListener<Future<Void>> {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        resultsListener.submissionFailed(UserException.connectionError()
+            .message("Connection %s closed unexpectedly.", connection.getName())
+            .build());
+      }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a8bad78..b39a103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack.Builder;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,13 +51,21 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
 
   private boolean supportComplexTypes = true;
 
-  public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+  public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
+      EventLoopGroup eventLoopGroup) {
+    super(
+        UserRpcConfig.getMapping(config),
+        alloc,
+        eventLoopGroup,
+        RpcType.HANDSHAKE,
+        BitToUserHandshake.class,
+        BitToUserHandshake.PARSER,
+        "user client");
     this.supportComplexTypes = supportComplexTypes;
   }
 
   public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
-    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+    send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
   }
 
   public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
@@ -66,6 +74,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
         .setSupportComplexTypes(supportComplexTypes)
+        .setSupportTimeout(true)
         .setCredentials(credentials);
 
     if (props != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 88592d4..ae728d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.rpc.user;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -30,13 +32,17 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class UserRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
-      .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
-      .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
-      .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
-      .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("USER")
+        .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+        .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
+        .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
+        .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
+        .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+        .build();
+  }
 
   public static int RPC_VERSION = 5;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9e929de..b3b7ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.rpc.user;
 
-import com.google.common.io.Closeables;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -56,6 +55,7 @@ import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
 import org.apache.drill.exec.work.user.UserWorker;
 
+import com.google.common.io.Closeables;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
@@ -68,7 +68,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
   public UserServer(DrillConfig config, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
       UserWorker worker) throws DrillbitStartupException {
-    super(UserRpcConfig.MAPPING, alloc.getUnderlyingAllocator(), eventLoopGroup);
+    super(UserRpcConfig.getMapping(config), alloc.getUnderlyingAllocator(), eventLoopGroup);
     this.worker = worker;
     this.alloc = alloc;
     if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
@@ -123,8 +123,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
     private UserSession session;
 
-    public UserClientConnection(Channel channel) {
-      super(channel);
+    public UserClientConnection(SocketChannel channel) {
+      super(channel, "user client");
+    }
+
+    void disableReadTimeout() {
+      getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
     }
 
     void setUser(UserToBitHandshake inbound) throws IOException {
@@ -161,7 +165,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
 
   @Override
-  public UserClientConnection initRemoteConnection(Channel channel) {
+  public UserClientConnection initRemoteConnection(SocketChannel channel) {
     return new UserClientConnection(channel);
   }
 
@@ -186,6 +190,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       public BitToUserHandshake getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
         logger.trace("Handling handshake from user to bit. {}", inbound);
 
+
+        // if timeout is unsupported or is set to false, disable timeout.
+        if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
+          connection.disableReadTimeout();
+          logger.warn("Timeout Disabled as client doesn't support it.", connection.getName());
+        }
+
         BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
             .setRpcVersion(UserRpcConfig.RPC_VERSION);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 49d0c94..b7ef584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,6 +18,9 @@
 package org.apache.drill.exec.work.foreman;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -121,6 +124,9 @@ public class Foreman implements Runnable {
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
+  private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
+  private final ChannelFuture closeFuture;
+
 
   /**
    * Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -139,6 +145,9 @@ public class Foreman implements Runnable {
     this.drillbitContext = drillbitContext;
 
     initiatingClient = connection;
+    this.closeFuture = initiatingClient.getChannel().closeFuture();
+    closeFuture.addListener(closeListener);
+
     queryContext = new QueryContext(connection.getSession(), drillbitContext);
     queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
         stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
@@ -146,6 +155,13 @@ public class Foreman implements Runnable {
     recordNewState(QueryState.PENDING);
   }
 
+  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
+    @Override
+    public void operationComplete(Future<Void> future) throws Exception {
+      cancel();
+    }
+  }
+
   /**
    * Get the QueryContext created for the query.
    *
@@ -603,6 +619,9 @@ public class Foreman implements Runnable {
       logger.info("foreman cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
 
+      // remove the channel disconnected listener (doesn't throw)
+      closeFuture.removeListener(closeListener);
+
       // These are straight forward removals from maps, so they won't throw.
       drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
       drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8006533..d98b97a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -27,6 +27,7 @@ drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
     user: {
+      timeout: 30,
       server: {
         port: 31010
         threads: 1
@@ -36,6 +37,7 @@ drill.exec: {
       }
     },
     bit: {
+      timeout: 30,
       server: {
         port : 31011,
         retry:{

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
index f47e719..c28cc29 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
@@ -42,6 +42,14 @@ public final class GeneralRPCProtos {
      * <code>RESPONSE_FAILURE = 2;</code>
      */
     RESPONSE_FAILURE(2, 2),
+    /**
+     * <code>PING = 3;</code>
+     */
+    PING(3, 3),
+    /**
+     * <code>PONG = 4;</code>
+     */
+    PONG(4, 4),
     ;
 
     /**
@@ -56,6 +64,14 @@ public final class GeneralRPCProtos {
      * <code>RESPONSE_FAILURE = 2;</code>
      */
     public static final int RESPONSE_FAILURE_VALUE = 2;
+    /**
+     * <code>PING = 3;</code>
+     */
+    public static final int PING_VALUE = 3;
+    /**
+     * <code>PONG = 4;</code>
+     */
+    public static final int PONG_VALUE = 4;
 
 
     public final int getNumber() { return value; }
@@ -65,6 +81,8 @@ public final class GeneralRPCProtos {
         case 0: return REQUEST;
         case 1: return RESPONSE;
         case 2: return RESPONSE_FAILURE;
+        case 3: return PING;
+        case 4: return PONG;
         default: return null;
       }
     }
@@ -1972,10 +1990,10 @@ public final class GeneralRPCProtos {
       "rdination_id\030\002 \001(\005\022\020\n\010rpc_type\030\003 \001(\005\"b\n\022" +
       "CompleteRpcMessage\022#\n\006header\030\001 \001(\0132\023.exe" +
       "c.rpc.RpcHeader\022\025\n\rprotobuf_body\030\002 \001(\014\022\020" +
-      "\n\010raw_body\030\003 \001(\014*:\n\007RpcMode\022\013\n\007REQUEST\020\000" +
-      "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002B1\n\033" +
-      "org.apache.drill.exec.protoB\020GeneralRPCP" +
-      "rotosH\001"
+      "\n\010raw_body\030\003 \001(\014*N\n\007RpcMode\022\013\n\007REQUEST\020\000" +
+      "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002\022\010\n\004" +
+      "PING\020\003\022\010\n\004PONG\020\004B1\n\033org.apache.drill.exe" +
+      "c.protoB\020GeneralRPCProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index d587dfc..6fc43bb 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -280,6 +280,8 @@ public final class SchemaUserProtos
 
                 if(message.hasSupportComplexTypes())
                     output.writeBool(6, message.getSupportComplexTypes(), false);
+                if(message.hasSupportTimeout())
+                    output.writeBool(7, message.getSupportTimeout(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message)
             {
@@ -339,6 +341,9 @@ public final class SchemaUserProtos
                         case 6:
                             builder.setSupportComplexTypes(input.readBool());
                             break;
+                        case 7:
+                            builder.setSupportTimeout(input.readBool());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -385,6 +390,7 @@ public final class SchemaUserProtos
                 case 4: return "credentials";
                 case 5: return "properties";
                 case 6: return "supportComplexTypes";
+                case 7: return "supportTimeout";
                 default: return null;
             }
         }
@@ -402,6 +408,7 @@ public final class SchemaUserProtos
             fieldMap.put("credentials", 4);
             fieldMap.put("properties", 5);
             fieldMap.put("supportComplexTypes", 6);
+            fieldMap.put("supportTimeout", 7);
         }
     }