You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/09 17:51:47 UTC

[1/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.

Repository: drill
Updated Branches:
  refs/heads/master d4f9bf2e9 -> 9d2096229


http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index c072a47..c3ff58b 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -1836,6 +1836,16 @@ public final class UserProtos {
      * <code>optional bool support_complex_types = 6 [default = false];</code>
      */
     boolean getSupportComplexTypes();
+
+    // optional bool support_timeout = 7 [default = false];
+    /**
+     * <code>optional bool support_timeout = 7 [default = false];</code>
+     */
+    boolean hasSupportTimeout();
+    /**
+     * <code>optional bool support_timeout = 7 [default = false];</code>
+     */
+    boolean getSupportTimeout();
   }
   /**
    * Protobuf type {@code exec.user.UserToBitHandshake}
@@ -1940,6 +1950,11 @@ public final class UserProtos {
               supportComplexTypes_ = input.readBool();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              supportTimeout_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2088,6 +2103,22 @@ public final class UserProtos {
       return supportComplexTypes_;
     }
 
+    // optional bool support_timeout = 7 [default = false];
+    public static final int SUPPORT_TIMEOUT_FIELD_NUMBER = 7;
+    private boolean supportTimeout_;
+    /**
+     * <code>optional bool support_timeout = 7 [default = false];</code>
+     */
+    public boolean hasSupportTimeout() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional bool support_timeout = 7 [default = false];</code>
+     */
+    public boolean getSupportTimeout() {
+      return supportTimeout_;
+    }
+
     private void initFields() {
       channel_ = org.apache.drill.exec.proto.UserBitShared.RpcChannel.USER;
       supportListening_ = false;
@@ -2095,6 +2126,7 @@ public final class UserProtos {
       credentials_ = org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
       properties_ = org.apache.drill.exec.proto.UserProtos.UserProperties.getDefaultInstance();
       supportComplexTypes_ = false;
+      supportTimeout_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2132,6 +2164,9 @@ public final class UserProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeBool(6, supportComplexTypes_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBool(7, supportTimeout_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2165,6 +2200,10 @@ public final class UserProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(6, supportComplexTypes_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(7, supportTimeout_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -2303,6 +2342,8 @@ public final class UserProtos {
         bitField0_ = (bitField0_ & ~0x00000010);
         supportComplexTypes_ = false;
         bitField0_ = (bitField0_ & ~0x00000020);
+        supportTimeout_ = false;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -2363,6 +2404,10 @@ public final class UserProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.supportComplexTypes_ = supportComplexTypes_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.supportTimeout_ = supportTimeout_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -2397,6 +2442,9 @@ public final class UserProtos {
         if (other.hasSupportComplexTypes()) {
           setSupportComplexTypes(other.getSupportComplexTypes());
         }
+        if (other.hasSupportTimeout()) {
+          setSupportTimeout(other.getSupportTimeout());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2799,6 +2847,39 @@ public final class UserProtos {
         return this;
       }
 
+      // optional bool support_timeout = 7 [default = false];
+      private boolean supportTimeout_ ;
+      /**
+       * <code>optional bool support_timeout = 7 [default = false];</code>
+       */
+      public boolean hasSupportTimeout() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional bool support_timeout = 7 [default = false];</code>
+       */
+      public boolean getSupportTimeout() {
+        return supportTimeout_;
+      }
+      /**
+       * <code>optional bool support_timeout = 7 [default = false];</code>
+       */
+      public Builder setSupportTimeout(boolean value) {
+        bitField0_ |= 0x00000040;
+        supportTimeout_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool support_timeout = 7 [default = false];</code>
+       */
+      public Builder clearSupportTimeout() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        supportTimeout_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.user.UserToBitHandshake)
     }
 
@@ -4890,31 +4971,31 @@ public final class UserProtos {
       "\032\023UserBitShared.proto\"&\n\010Property\022\013\n\003key" +
       "\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"9\n\016UserProperties\022" +
       "\'\n\nproperties\030\001 \003(\0132\023.exec.user.Property" +
-      "\"\374\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" +
+      "\"\234\002\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" +
       "2\027.exec.shared.RpcChannel:\004USER\022\031\n\021suppo" +
       "rt_listening\030\002 \001(\010\022\023\n\013rpc_version\030\003 \001(\005\022" +
       "1\n\013credentials\030\004 \001(\0132\034.exec.shared.UserC" +
       "redentials\022-\n\nproperties\030\005 \001(\0132\031.exec.us" +
       "er.UserProperties\022$\n\025support_complex_typ",
-      "es\030\006 \001(\010:\005false\"S\n\016RequestResults\022&\n\010que" +
-      "ry_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021max" +
-      "imum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014resu" +
-      "lts_mode\030\001 \001(\0162\033.exec.user.QueryResultsM" +
-      "ode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryTyp" +
-      "e\022\014\n\004plan\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n" +
-      "\013rpc_version\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exe" +
-      "c.user.HandshakeStatus\022\017\n\007errorId\030\004 \001(\t\022" +
-      "\024\n\014errorMessage\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHAN" +
-      "DSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QU",
-      "ERY\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESUL" +
-      "TS\020\005\022\016\n\nQUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026" +
-      "\n\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_" +
-      "LIST\020\t\022\020\n\014QUERY_RESULT\020\n*#\n\020QueryResults" +
-      "Mode\022\017\n\013STREAM_FULL\020\001*^\n\017HandshakeStatus" +
-      "\022\013\n\007SUCCESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022" +
-      "\017\n\013AUTH_FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n" +
-      "\033org.apache.drill.exec.protoB\nUserProtos" +
-      "H\001"
+      "es\030\006 \001(\010:\005false\022\036\n\017support_timeout\030\007 \001(\010" +
+      ":\005false\"S\n\016RequestResults\022&\n\010query_id\030\001 " +
+      "\001(\0132\024.exec.shared.QueryId\022\031\n\021maximum_res" +
+      "ponses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014results_mode" +
+      "\030\001 \001(\0162\033.exec.user.QueryResultsMode\022$\n\004t" +
+      "ype\030\002 \001(\0162\026.exec.shared.QueryType\022\014\n\004pla" +
+      "n\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n\013rpc_ver" +
+      "sion\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exec.user.H" +
+      "andshakeStatus\022\017\n\007errorId\030\004 \001(\t\022\024\n\014error" +
+      "Message\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
+      "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY\020\003\022\020\n" +
+      "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\016\n\n" +
+      "QUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022REQ_ME" +
+      "TA_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIST\020\t\022\020" +
+      "\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMode\022\017\n\013" +
+      "STREAM_FULL\020\001*^\n\017HandshakeStatus\022\013\n\007SUCC" +
+      "ESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022\017\n\013AUTH_" +
+      "FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n\033org.apa" +
+      "che.drill.exec.protoB\nUserProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4938,7 +5019,7 @@ public final class UserProtos {
           internal_static_exec_user_UserToBitHandshake_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_user_UserToBitHandshake_descriptor,
-              new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", "SupportComplexTypes", });
+              new java.lang.String[] { "Channel", "SupportListening", "RpcVersion", "Credentials", "Properties", "SupportComplexTypes", "SupportTimeout", });
           internal_static_exec_user_RequestResults_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_exec_user_RequestResults_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
index df71855..e34b52d 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcMode.java
@@ -24,7 +24,9 @@ public enum RpcMode implements com.dyuproject.protostuff.EnumLite<RpcMode>
 {
     REQUEST(0),
     RESPONSE(1),
-    RESPONSE_FAILURE(2);
+    RESPONSE_FAILURE(2),
+    PING(3),
+    PONG(4);
     
     public final int number;
     
@@ -45,6 +47,8 @@ public enum RpcMode implements com.dyuproject.protostuff.EnumLite<RpcMode>
             case 0: return REQUEST;
             case 1: return RESPONSE;
             case 2: return RESPONSE_FAILURE;
+            case 3: return PING;
+            case 4: return PONG;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
index 67ac4e5..efd8e58 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
@@ -47,6 +47,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
     static final UserToBitHandshake DEFAULT_INSTANCE = new UserToBitHandshake();
 
     static final Boolean DEFAULT_SUPPORT_COMPLEX_TYPES = new Boolean(false);
+    static final Boolean DEFAULT_SUPPORT_TIMEOUT = new Boolean(false);
     
     private RpcChannel channel;
     private Boolean supportListening;
@@ -54,6 +55,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
     private UserCredentials credentials;
     private UserProperties properties;
     private Boolean supportComplexTypes = DEFAULT_SUPPORT_COMPLEX_TYPES;
+    private Boolean supportTimeout = DEFAULT_SUPPORT_TIMEOUT;
 
     public UserToBitHandshake()
     {
@@ -140,6 +142,19 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
         return this;
     }
 
+    // supportTimeout
+
+    public Boolean getSupportTimeout()
+    {
+        return supportTimeout;
+    }
+
+    public UserToBitHandshake setSupportTimeout(Boolean supportTimeout)
+    {
+        this.supportTimeout = supportTimeout;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -214,6 +229,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
                 case 6:
                     message.supportComplexTypes = input.readBool();
                     break;
+                case 7:
+                    message.supportTimeout = input.readBool();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -242,6 +260,9 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
 
         if(message.supportComplexTypes != null && message.supportComplexTypes != DEFAULT_SUPPORT_COMPLEX_TYPES)
             output.writeBool(6, message.supportComplexTypes, false);
+
+        if(message.supportTimeout != null && message.supportTimeout != DEFAULT_SUPPORT_TIMEOUT)
+            output.writeBool(7, message.supportTimeout, false);
     }
 
     public String getFieldName(int number)
@@ -254,6 +275,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
             case 4: return "credentials";
             case 5: return "properties";
             case 6: return "supportComplexTypes";
+            case 7: return "supportTimeout";
             default: return null;
         }
     }
@@ -273,6 +295,7 @@ public final class UserToBitHandshake implements Externalizable, Message<UserToB
         __fieldMap.put("credentials", 4);
         __fieldMap.put("properties", 5);
         __fieldMap.put("supportComplexTypes", 6);
+        __fieldMap.put("supportTimeout", 7);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/GeneralRPC.proto b/protocol/src/main/protobuf/GeneralRPC.proto
index 5538abe..26ab821 100644
--- a/protocol/src/main/protobuf/GeneralRPC.proto
+++ b/protocol/src/main/protobuf/GeneralRPC.proto
@@ -14,6 +14,8 @@ enum RpcMode {
   REQUEST = 0;
   RESPONSE = 1;
   RESPONSE_FAILURE = 2;
+  PING = 3;
+  PONG = 4;
 }
 
 message RpcHeader{

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto
index 59e22ae..185a646 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -44,6 +44,7 @@ message UserToBitHandshake {
   optional exec.shared.UserCredentials credentials = 4;
   optional UserProperties properties = 5;
   optional bool support_complex_types = 6 [default = false];
+  optional bool support_timeout = 7 [default = false];
 }
 
 message RequestResults {


[2/3] drill git commit: DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure. Add RPC timeout.

Posted by ja...@apache.org.
DRILL-2971, DRILL-2886, DRILL-2778, DRILL-2545: Improve RPC connection detection failure.  Add RPC timeout.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/960f876a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/960f876a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/960f876a

Branch: refs/heads/master
Commit: 960f876a1945eee4eeb0d6a98c5c58dfe2eea1a9
Parents: d4f9bf2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri May 8 08:15:36 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 07:12:23 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../apache/drill/exec/client/DrillClient.java   |   8 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  | 110 ++++++++++----
 .../exec/rpc/BasicClientWithConnection.java     |  21 +--
 .../org/apache/drill/exec/rpc/BasicServer.java  |  78 +++++++---
 .../drill/exec/rpc/CoordinationQueue.java       |   2 +-
 .../apache/drill/exec/rpc/RemoteConnection.java |  31 +++-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  | 151 +++++++++++--------
 .../org/apache/drill/exec/rpc/RpcConfig.java    |  35 ++++-
 .../drill/exec/rpc/RpcExceptionHandler.java     |  12 +-
 .../drill/exec/rpc/control/ControlClient.java   |  14 +-
 .../exec/rpc/control/ControlConnection.java     |   7 +-
 .../exec/rpc/control/ControlRpcConfig.java      |  25 +--
 .../drill/exec/rpc/control/ControlServer.java   |  13 +-
 .../exec/rpc/data/BitServerConnection.java      |   7 +-
 .../apache/drill/exec/rpc/data/DataClient.java  |  11 +-
 .../exec/rpc/data/DataClientConnection.java     |   6 +-
 .../drill/exec/rpc/data/DataRpcConfig.java      |  15 +-
 .../apache/drill/exec/rpc/data/DataServer.java  |  11 +-
 .../drill/exec/rpc/user/QueryResultHandler.java |  36 ++++-
 .../apache/drill/exec/rpc/user/UserClient.java  |  19 ++-
 .../drill/exec/rpc/user/UserRpcConfig.java      |  22 ++-
 .../apache/drill/exec/rpc/user/UserServer.java  |  23 ++-
 .../apache/drill/exec/work/foreman/Foreman.java |  19 +++
 .../src/main/resources/drill-module.conf        |   2 +
 .../drill/exec/proto/GeneralRPCProtos.java      |  26 +++-
 .../drill/exec/proto/SchemaUserProtos.java      |   7 +
 .../org/apache/drill/exec/proto/UserProtos.java | 123 ++++++++++++---
 .../apache/drill/exec/proto/beans/RpcMode.java  |   6 +-
 .../exec/proto/beans/UserToBitHandshake.java    |  23 +++
 protocol/src/main/protobuf/GeneralRPC.proto     |   2 +
 protocol/src/main/protobuf/User.proto           |   1 +
 32 files changed, 626 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be8c7a0..97d5770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -42,7 +42,9 @@ public interface ExecConstants {
   public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
   public static final String SERVICE_NAME = "drill.exec.cluster-id";
   public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+  public static final String BIT_RPC_TIMEOUT = "drill.exec.rpc.bit.timeout";
   public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
+  public static final String USER_RPC_TIMEOUT = "drill.exec.rpc.user.timeout";
   public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
   public static final String FUNCTION_PACKAGES = "drill.exec.functions";
   public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 9924704..136d8c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -20,20 +20,17 @@ package org.apache.drill.exec.client;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
-
-import com.google.common.base.Strings;
 import io.netty.buffer.DrillBuf;
+import io.netty.channel.EventLoopGroup;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Vector;
 
-import io.netty.channel.EventLoopGroup;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -63,6 +60,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -208,7 +206,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     eventLoopGroup = createEventLoop(config.getInt(ExecConstants.CLIENT_RPC_THREADS), "Client-");
-    client = new UserClient(supportComplexTypes, allocator, eventLoopGroup);
+    client = new UserClient(config, supportComplexTypes, allocator, eventLoopGroup);
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
     connect(endpoint);
     connected = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 72ae130..1661f81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -20,18 +20,24 @@ package org.apache.drill.exec.rpc;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 
 import com.google.protobuf.Internal.EnumLite;
@@ -40,7 +46,12 @@ import com.google.protobuf.Parser;
 
 public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
     extends RpcBus<T, R> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
+
+  // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
+  // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
+  // seconds, the idle state handler will send a ping message.
+  private static final double PERCENT_TIMEOUT_BEFORE_SENDING_PING = 0.5;
 
   private final Bootstrap b;
   protected R connection;
@@ -48,18 +59,23 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
   private final Class<HANDSHAKE_RESPONSE> responseClass;
   private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
 
+  private final IdlePingHandler pingHandler;
+
   public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
       Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
     super(rpcMapping);
     this.responseClass = responseClass;
     this.handshakeType = handshakeType;
     this.handshakeParser = handshakeParser;
+    final long timeoutInMillis = rpcMapping.hasTimeout() ? (long) (rpcMapping.getTimeout() * 1000.0 * PERCENT_TIMEOUT_BEFORE_SENDING_PING)
+        : -1;
+    this.pingHandler = rpcMapping.hasTimeout() ? new IdlePingHandler(timeoutInMillis) : null;
 
     b = new Bootstrap() //
         .group(eventLoopGroup) //
         .channel(TransportCheck.getClientSocketChannel()) //
         .option(ChannelOption.ALLOCATOR, alloc) //
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.SO_RCVBUF, 1 << 17) //
         .option(ChannelOption.SO_SNDBUF, 1 << 17) //
@@ -68,40 +84,72 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
-//            logger.debug("initializing client connection.");
+            // logger.debug("initializing client connection.");
             connection = initRemoteConnection(ch);
-            ch.closeFuture().addListener(getCloseHandler(connection));
-
-            ch.pipeline().addLast( //
-                getDecoder(connection.getAllocator()), //
-                new RpcDecoder("c-" + rpcConfig.getName()), //
-                new RpcEncoder("c-" + rpcConfig.getName()), //
-                new ClientHandshakeHandler(), //
-                new InboundHandler(connection), //
-                new RpcExceptionHandler() //
-                );
+
+            ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+            final ChannelPipeline pipe = ch.pipeline();
+
+            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator()));
+            pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName()));
+            pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName()));
+            pipe.addLast("handshake-handler", new ClientHandshakeHandler());
+
+            if(pingHandler != null){
+              pipe.addLast("idle-state-handler", pingHandler);
+            }
+
+            pipe.addLast("message-handler", new InboundHandler(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
           }
         }); //
 
-//    if(TransportCheck.SUPPORTS_EPOLL){
-//      b.option(EpollChannelOption.SO_REUSEPORT, true); //
-//    }
+    // if(TransportCheck.SUPPORTS_EPOLL){
+    // b.option(EpollChannelOption.SO_REUSEPORT, true); //
+    // }
+  }
+
+  private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);
+
+  /**
+   * Handler that watches for situations where we haven't read from the socket in a certain timeout. If we exceed this
+   * timeout, we send a PING message to the server to state that we are still alive.
+   */
+  private class IdlePingHandler extends IdleStateHandler {
+
+    private GenericFutureListener<Future<? super Void>> pingFailedHandler = new GenericFutureListener<Future<? super Void>>() {
+      public void operationComplete(Future<? super Void> future) throws Exception {
+        if (!future.isSuccess()) {
+          logger.error("Unable to maintain connection {}.  Closing connection.", connection.getName());
+          connection.close();
+        }
+      }
+    };
+
+    public IdlePingHandler(long idleWaitInMillis) {
+      super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
+      if (evt.state() == IdleState.WRITER_IDLE) {
+        ctx.writeAndFlush(PING_MESSAGE).addListener(pingFailedHandler);
+      }
+    }
   }
 
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
 
-  public boolean isActive(){
+  public boolean isActive() {
     return connection != null
         && connection.getChannel() != null
-        && connection.getChannel().isActive() ;
+        && connection.getChannel().isActive();
   }
 
   protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
-  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
 
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
-    return new ChannelClosedHandler();
-  }
+  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
 
   public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
       T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
@@ -118,7 +166,8 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return true;
   }
 
-  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue,
+      String host, int port) {
     ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
     b.connect(host, port).addListener(cml.connectionHandler);
   }
@@ -145,16 +194,17 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
-//        logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
+        // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
         try {
           future.get();
           if (future.isSuccess()) {
-            // send a handshake on the current thread.  This is the only time we will send from within the event thread.  We can do this because the connection will not be backed up.
+            // send a handshake on the current thread. This is the only time we will send from within the event thread.
+            // We can do this because the connection will not be backed up.
             send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
           } else {
             l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
           }
-//          logger.debug("Handshake queued for send.");
+          // logger.debug("Handshake queued for send.");
         } catch (Exception ex) {
           l.connectionFailed(FailureType.CONNECTION, ex);
         }
@@ -174,12 +224,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
-//        logger.debug("Handshake received. {}", value);
+        // logger.debug("Handshake received. {}", value);
         try {
           BasicClient.this.validateHandshake(value);
           BasicClient.this.finalizeConnection(value, connection);
           l.connectionSucceeded(connection);
-//          logger.debug("Handshake completed succesfully.");
+          // logger.debug("Handshake completed succesfully.");
         } catch (RpcException ex) {
           l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
         }
@@ -205,7 +255,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
   }
 
-  public void setAutoRead(boolean enableAutoRead){
+  public void setAutoRead(boolean enableAutoRead) {
     connection.setAutoRead(enableAutoRead);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 08819ca..ab54fa1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -18,10 +18,8 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.channel.socket.SocketChannel;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -35,16 +33,13 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
 
   private BufferAllocator alloc;
+  private final String connectionName;
 
   public BasicClientWithConnection(RpcConfig rpcMapping, BufferAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
-      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser, String connectionName) {
     super(rpcMapping, alloc.getUnderlyingAllocator(), eventLoopGroup, handshakeType, responseClass, handshakeParser);
     this.alloc = alloc;
-  }
-
-  @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
-    return getCloseHandler(clientConnection.getChannel());
+    this.connectionName = connectionName;
   }
 
   @Override
@@ -55,16 +50,16 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
   protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
 
   @Override
-  public ServerConnection initRemoteConnection(Channel channel) {
-    return new ServerConnection(channel, alloc);
+  public ServerConnection initRemoteConnection(SocketChannel channel) {
+    return new ServerConnection(connectionName, channel, alloc);
   }
 
   public static class ServerConnection extends RemoteConnection{
 
     private final BufferAllocator alloc;
 
-    public ServerConnection(Channel channel, BufferAllocator alloc) {
-      super(channel);
+    public ServerConnection(String name, SocketChannel channel, BufferAllocator alloc) {
+      super(channel, name);
       this.alloc = alloc;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 3a7032b..a148436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -20,12 +20,13 @@ package org.apache.drill.exec.rpc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 
 import java.io.IOException;
 import java.net.BindException;
@@ -44,40 +45,52 @@ import com.google.protobuf.Parser;
  * requests will generate more than one outbound request.
  */
 public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection> extends RpcBus<T, C> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
+  final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+  protected static final String TIMEOUT_HANDLER = "timeout-handler";
 
   private ServerBootstrap b;
   private volatile boolean connect = false;
   private final EventLoopGroup eventLoopGroup;
 
-  public BasicServer(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
     super(rpcMapping);
     this.eventLoopGroup = eventLoopGroup;
-    b = new ServerBootstrap() //
-        .channel(TransportCheck.getServerSocketChannel()) //
-        .option(ChannelOption.SO_BACKLOG, 1000) //
+
+    b = new ServerBootstrap()
+        .channel(TransportCheck.getServerSocketChannel())
+        .option(ChannelOption.SO_BACKLOG, 1000)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30*1000)
         .option(ChannelOption.TCP_NODELAY, true)
         .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
-        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+        .option(ChannelOption.SO_RCVBUF, 1 << 17)
+        .option(ChannelOption.SO_SNDBUF, 1 << 17)
         .group(eventLoopGroup) //
-        .childOption(ChannelOption.ALLOCATOR, alloc) //
-//        .handler(new LoggingHandler(LogLevel.INFO)) //
+        .childOption(ChannelOption.ALLOCATOR, alloc)
+
+        // .handler(new LoggingHandler(LogLevel.INFO))
+
         .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
 //            logger.debug("Starting initialization of server connection.");
             C connection = initRemoteConnection(ch);
-            ch.closeFuture().addListener(getCloseHandler(connection));
-
-            ch.pipeline().addLast( //
-                getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), //
-                new RpcDecoder("s-" + rpcConfig.getName()), //
-                new RpcEncoder("s-" + rpcConfig.getName()), //
-                getHandshakeHandler(connection), new InboundHandler(connection), //
-                new RpcExceptionHandler() //
-                );
+            ch.closeFuture().addListener(getCloseHandler(ch, connection));
+
+            final ChannelPipeline pipe = ch.pipeline();
+            pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
+            pipe.addLast("message-decoder", new RpcDecoder("s-" + rpcConfig.getName()));
+            pipe.addLast("protocol-encoder", new RpcEncoder("s-" + rpcConfig.getName()));
+            pipe.addLast("handshake-handler", getHandshakeHandler(connection));
+
+            if (rpcMapping.hasTimeout()) {
+              pipe.addLast(TIMEOUT_HANDLER,
+                  new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout()));
+            }
+
+            pipe.addLast("message-handler", new InboundHandler(connection));
+            pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName()));
+
             connect = true;
 //            logger.debug("Server connection initialization completed.");
           }
@@ -88,10 +101,33 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
 //     }
   }
 
+  private class LogggingReadTimeoutHandler extends ReadTimeoutHandler {
+
+    private final String name;
+    private final int timeoutSeconds;
+    public LogggingReadTimeoutHandler(String name, int timeoutSeconds) {
+      super(timeoutSeconds);
+      this.name = name;
+      this.timeoutSeconds = timeoutSeconds;
+    }
+
+    @Override
+    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+      logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", name,
+          timeoutSeconds);
+      super.readTimedOut(ctx);
+    }
+
+  }
+
   public OutOfMemoryHandler getOutOfMemoryHandler() {
     return OutOfMemoryHandler.DEFAULT_INSTANCE;
   }
 
+  protected void removeTimeoutHandler() {
+
+  }
+
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
 
   @Override
@@ -141,7 +177,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   }
 
   @Override
-  public C initRemoteConnection(Channel channel) {
+  public C initRemoteConnection(SocketChannel channel) {
     return null;
   }
 
@@ -152,7 +188,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
         b.bind(++port).sync();
         break;
       } catch (Exception e) {
-        if (e instanceof BindException && allowPortHunting){
+        if (e instanceof BindException && allowPortHunting) {
           continue;
         }
         throw new DrillbitStartupException("Could not bind Drillbit", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 1bb65d3..5a5bbab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -86,7 +86,7 @@ public class CoordinationQueue {
         if (future.channel().isActive()) {
            throw new RpcException("Future failed") ;
         } else {
-          throw  new ChannelClosedException();
+          setException(new ChannelClosedException());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index a72dd32..0f095c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.rpc;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ExecutionException;
 
@@ -30,16 +33,31 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   private final WriteManager writeManager;
+  private final String name;
 
   public boolean inEventLoop(){
     return channel.eventLoop().inEventLoop();
   }
 
-  public RemoteConnection(Channel channel) {
+  public RemoteConnection(SocketChannel channel, String name) {
     super();
     this.channel = channel;
+    this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name);
     this.writeManager = new WriteManager();
     channel.pipeline().addLast(new BackPressureHandler());
+    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
+      public void operationComplete(Future<? super Void> future) throws Exception {
+        // this could possibly overrelease but it doesn't matter since we're only going to do this to ensure that we
+        // fail out any pending messages
+        writeManager.disable();
+        writeManager.setWritable(true);
+      }
+    });
+
+  }
+
+  public String getName() {
+    return name;
   }
 
   public abstract BufferAllocator getAllocator();
@@ -72,6 +90,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
    */
   private static class WriteManager{
     private final ResettableBarrier barrier = new ResettableBarrier();
+    private volatile boolean disabled = false;
 
     public WriteManager(){
       barrier.openBarrier();
@@ -82,15 +101,17 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
     }
 
     public void setWritable(boolean isWritable){
-//      logger.debug("Set writable: {}", isWritable);
       if(isWritable){
         barrier.openBarrier();
-      }else{
+      } else if (!disabled) {
         barrier.closeBarrier();
       }
 
     }
 
+    public void disable() {
+      disabled = true;
+    }
   }
 
   private class BackPressureHandler extends ChannelInboundHandlerAdapter{
@@ -107,7 +128,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
   @Override
   public void close() {
     try {
-      channel.close().get();
+      if (channel.isActive()) {
+        channel.close().get();
+      }
     } catch (InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
       // TODO InterruptedException

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b165b53..92ce312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -23,10 +23,12 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.List;
 
@@ -125,25 +127,36 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
   }
 
-  public abstract C initRemoteConnection(Channel channel);
+  public abstract C initRemoteConnection(SocketChannel channel);
 
   public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {
+
+    final InetSocketAddress local;
+    final InetSocketAddress remote;
+    final C clientConnection;
+
+    public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) {
+      this.local = local;
+      this.remote = remote;
+      this.clientConnection = clientConnection;
+    }
+
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel()
-          .remoteAddress());
-      closeQueueDueToChannelClose();
-    }
-  }
+      String msg = String.format("Channel closed %s <--> %s.", local, remote);
+      if (RpcBus.this.isClient()) {
+        logger.info(String.format(msg));
+      } else {
+        queue.channelClosed(new ChannelClosedException(msg));
+      }
 
-  protected void closeQueueDueToChannelClose() {
-    if (this.isClient()) {
-      queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure."));
+      clientConnection.close();
     }
+
   }
 
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
-    return new ChannelClosedHandler();
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
+    return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress());
   }
 
   private class ResponseSenderImpl implements ResponseSender {
@@ -170,8 +183,11 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   }
 
+  private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK);
+
   protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
 
+
     private final C connection;
     public InboundHandler(C connection) {
       super();
@@ -179,67 +195,84 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
 
     @Override
-    protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception {
+    protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception {
       if (!ctx.channel().isOpen()) {
         return;
       }
       if (RpcConstants.EXTRA_DEBUGGING) {
         logger.debug("Received message {}", msg);
       }
-      switch (msg.mode) {
-      case REQUEST: {
-        // handle message and ack.
-        ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
-        try {
-          handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
-        } catch(UserRpcException e){
-          UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
-
-          logger.error("Unexpected Error while handling request message", e);
-
-          OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE_FAILURE, 0, msg.coordinationId,
-            uex.getOrCreatePBError(false));
-          if (RpcConstants.EXTRA_DEBUGGING) {
-            logger.debug("Adding message to outbound buffer. {}", outMessage);
+      final Channel channel = connection.getChannel();
+
+      try{
+        switch (msg.mode) {
+        case REQUEST: {
+          // handle message and ack.
+
+          try {
+            ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId);
+            handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender);
+          } catch (UserRpcException e) {
+            UserException uex = UserException.systemError(e).addIdentity(e.getEndpoint()).build();
+
+            logger.error("Unexpected Error while handling request message", e);
+
+            OutboundRpcMessage outMessage = new OutboundRpcMessage(
+                RpcMode.RESPONSE_FAILURE,
+                0,
+                msg.coordinationId,
+                uex.getOrCreatePBError(false)
+                );
+
+            if (RpcConstants.EXTRA_DEBUGGING) {
+              logger.debug("Adding message to outbound buffer. {}", outMessage);
+            }
+
+            channel.writeAndFlush(outMessage);
           }
-          connection.getChannel().writeAndFlush(outMessage);
+          break;
         }
-        msg.release();  // we release our ownership.  Handle could have taken over ownership.
-        break;
-      }
 
-      case RESPONSE:
-        try{
-        MessageLite m = getResponseDefaultInstance(msg.rpcType);
-        assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
-        RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
-        Parser<?> parser = m.getParserForType();
-        Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        rpcFuture.set(value, msg.dBody);
-        msg.release();  // we release our ownership.  Handle could have taken over ownership.
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-        }
-        }catch(Exception ex) {
-          logger.error("Failure while handling response.", ex);
-          throw ex;
-        }
-        break;
+        case RESPONSE:
+          try {
+            MessageLite m = getResponseDefaultInstance(msg.rpcType);
+            assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
+            RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+            Parser<?> parser = m.getParserForType();
+            Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+            rpcFuture.set(value, msg.dBody);
+            if (RpcConstants.EXTRA_DEBUGGING) {
+              logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+            }
+          } catch (Exception ex) {
+            logger.error("Failure while handling response.", ex);
+            throw ex;
+          }
+          break;
 
-      case RESPONSE_FAILURE:
-        DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        queue.updateFailedFuture(msg.coordinationId, failure);
-        msg.release();
-        if (RpcConstants.EXTRA_DEBUGGING) {
-          logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
-        }
-        break;
+        case RESPONSE_FAILURE:
+          DrillPBError failure = DrillPBError.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+          queue.updateFailedFuture(msg.coordinationId, failure);
+          if (RpcConstants.EXTRA_DEBUGGING) {
+            logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+          }
+          break;
+
+        case PING:
+          connection.getChannel().writeAndFlush(PONG);
+          break;
+
+        case PONG:
+          // noop.
+          break;
 
-      default:
-        throw new UnsupportedOperationException();
+        default:
+          throw new UnsupportedOperationException();
+        }
+      } finally {
+        msg.release();
       }
     }
-
   }
 
   public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
index b5974f6..ab6c375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.protobuf.Internal.EnumLite;
@@ -28,11 +29,14 @@ public class RpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConfig.class);
 
   private final String name;
+  private final int timeout;
   private final Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap;
   private final Map<Integer, RpcMessageType<?, ?, ?>> receiveMap;
 
-  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap, Map<Integer, RpcMessageType<?, ?, ?>> receiveMap) {
+  private RpcConfig(String name, Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap,
+      Map<Integer, RpcMessageType<?, ?, ?>> receiveMap, int timeout) {
     this.name = name;
+    this.timeout = timeout;
     this.sendMap = ImmutableMap.copyOf(sendMap);
     this.receiveMap = ImmutableMap.copyOf(receiveMap);
   }
@@ -41,6 +45,13 @@ public class RpcConfig {
     return name;
   }
 
+  public int getTimeout() {
+    return timeout;
+  }
+
+  public boolean hasTimeout() {
+    return timeout > 0;
+  }
   public boolean checkReceive(int rpcType, Class<?> receiveClass) {
     if (RpcConstants.EXTRA_DEBUGGING) {
       logger.debug(String.format("Checking reception for rpcType %d and receive class %s.", rpcType, receiveClass));
@@ -134,17 +145,27 @@ public class RpcConfig {
 
   }
 
-  public static RpcConfigBuilder newBuilder(String name) {
-    return new RpcConfigBuilder(name);
+  public static RpcConfigBuilder newBuilder() {
+    return new RpcConfigBuilder();
   }
 
   public static class RpcConfigBuilder {
-    private final String name;
+    private String name;
+    private int timeout = -1;
     private Map<EnumLite, RpcMessageType<?, ?, ?>> sendMap = Maps.newHashMap();
     private Map<Integer, RpcMessageType<?, ?, ?>> receiveMap = Maps.newHashMap();
 
-    private RpcConfigBuilder(String name) {
+    private RpcConfigBuilder() {
+    }
+
+    public RpcConfigBuilder name(String name) {
       this.name = name;
+      return this;
+    }
+
+    public RpcConfigBuilder timeout(int timeoutInSeconds) {
+      this.timeout = timeoutInSeconds;
+      return this;
     }
 
     public <SEND extends MessageLite, RECEIVE extends MessageLite, T extends EnumLite>  RpcConfigBuilder add(T sendEnum, Class<SEND> send, T receiveEnum, Class<RECEIVE> rec) {
@@ -155,7 +176,9 @@ public class RpcConfig {
     }
 
     public RpcConfig build() {
-      return new RpcConfig(name, sendMap, receiveMap);
+      Preconditions.checkArgument(timeout > -1, "Timeout must be a positive number or zero for disabled.");
+      Preconditions.checkArgument(name != null, "RpcConfig name must be set.");
+      return new RpcConfig(name, sendMap, receiveMap, timeout);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index 537452e..c12ff7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -23,23 +23,23 @@ import io.netty.channel.ChannelHandlerContext;
 public class RpcExceptionHandler implements ChannelHandler{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
 
-  public RpcExceptionHandler(){
-  }
+  private final String name;
 
+  public RpcExceptionHandler(String name) {
+    this.name = name;
+  }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
     if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
-      logger.warn("Exception with closed channel", cause);
+      logger.warn("Exception occurred with closed channel.  Connection: {}", name, cause);
       return;
     }else{
-      logger.error("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+      logger.error("Exception in RPC communication.  Connection: {}.  Closing connection.", name, cause);
       ctx.close();
     }
   }
 
-
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index d546db3..f191271 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -50,7 +50,8 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   private final BufferAllocator allocator;
 
   public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler, BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
+    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup(), RpcType.HANDSHAKE, BitControlHandshake.class, BitControlHandshake.PARSER);
     this.localIdentity = localEndpoint;
     this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;
@@ -64,14 +65,15 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
 
   @SuppressWarnings("unchecked")
   @Override
-  public ControlConnection initRemoteConnection(Channel channel) {
-    this.connection = new ControlConnection(channel, (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+  public ControlConnection initRemoteConnection(SocketChannel channel) {
+    this.connection = new ControlConnection("control client", channel,
+        (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
     return connection;
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection clientConnection) {
-    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection clientConnection) {
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a7aaa9c..49f0f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import java.util.UUID;
 
@@ -41,8 +41,9 @@ public class ControlConnection extends RemoteConnection {
   private volatile boolean active = false;
   private final UUID id;
 
-  public ControlConnection(Channel channel, RpcBus<RpcType, ControlConnection> bus, BufferAllocator allocator) {
-    super(channel);
+  public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
+      BufferAllocator allocator) {
+    super(channel, name);
     this.bus = bus;
     this.id = UUID.randomUUID();
     this.allocator = allocator;

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index 37730e3..f92bb49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -17,7 +17,8 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -34,15 +35,19 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class ControlRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-CONTROL-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
-      .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
-      .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("CONTROL")
+        .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
+        .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+        .build();
+  }
 
   public static int RPC_VERSION = 3;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 43089d3..5e405ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.control;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -44,7 +44,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   private BufferAllocator allocator;
 
   public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
-    super(ControlRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+    super(ControlRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup());
     this.handler = handler;
     this.connectionRegistry = connectionRegistry;
     this.allocator = context.getAllocator();
@@ -61,14 +62,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(ControlConnection connection) {
-    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
     return proxyCloseHandler;
   }
 
   @Override
-  public ControlConnection initRemoteConnection(Channel channel) {
-    return new ControlConnection(channel, this, allocator);
+  public ControlConnection initRemoteConnection(SocketChannel channel) {
+    return new ControlConnection("control server", channel, this, allocator);
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
index 1d539a2..44c8ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.RemoteConnection;
@@ -26,8 +26,9 @@ public class BitServerConnection extends RemoteConnection{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
 
   private final BufferAllocator allocator;
-  public BitServerConnection(Channel channel, BufferAllocator allocator) {
-    super(channel);
+
+  public BitServerConnection(SocketChannel channel, BufferAllocator allocator) {
+    super(channel, "data server");
     this.allocator = allocator;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 8e2507b..b8a07c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,8 +18,8 @@
 package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -47,21 +47,22 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
 
 
   public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
-    super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
+    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER);
     this.remoteEndpoint = remoteEndpoint;
     this.closeHandlerFactory = closeHandlerFactory;
     this.allocator = context.getAllocator();
   }
 
   @Override
-  public DataClientConnection initRemoteConnection(Channel channel) {
+  public DataClientConnection initRemoteConnection(SocketChannel channel) {
     this.connection = new DataClientConnection(channel, this);
     return connection;
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(DataClientConnection clientConnection) {
-    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 3a569db..eb5778d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
 
 import java.util.UUID;
 
@@ -36,8 +36,8 @@ public class DataClientConnection extends RemoteConnection{
   private final DataClient client;
   private final UUID id;
 
-  public DataClientConnection(Channel channel, DataClient client) {
-    super(channel);
+  public DataClientConnection(SocketChannel channel, DataClient client) {
+    super(channel, "data client");
     this.client = client;
     // we use a local listener pool unless a global one is provided.
     this.id = UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index 807b6c3..c5cf498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -17,7 +17,8 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.BitData.BitServerHandshake;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
@@ -30,10 +31,14 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class DataRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("BIT-DATA-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
-      .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("DATA")
+        .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
+        .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+        .build();
+  }
 
   public static int RPC_VERSION = 4;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 6f8e20b..0d4077e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
@@ -57,7 +57,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   private final DataResponseHandler dataHandler;
 
   public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
-    super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
+    super(DataRpcConfig.getMapping(context.getConfig()), context.getAllocator().getUnderlyingAllocator(), context
+        .getBitLoopGroup());
     this.context = context;
     this.workBus = workBus;
     this.dataHandler = dataHandler;
@@ -69,13 +70,13 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   }
 
   @Override
-  protected GenericFutureListener<ChannelFuture> getCloseHandler(BitServerConnection connection) {
-    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
     return proxyCloseHandler;
   }
 
   @Override
-  public BitServerConnection initRemoteConnection(Channel channel) {
+  public BitServerConnection initRemoteConnection(SocketChannel channel) {
     return new BitServerConnection(channel, context.getAllocator());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5e3e937..302be72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -19,18 +19,22 @@ package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
@@ -63,8 +67,9 @@ public class QueryResultHandler {
   private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
       Maps.newConcurrentMap();
 
-  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
-    return new SubmissionListener(resultsListener);
+  public RpcOutcomeListener<QueryId> getWrappedListener(RemoteConnection connection,
+      UserResultsListener resultsListener) {
+    return new SubmissionListener(connection, resultsListener);
   }
 
   /**
@@ -268,12 +273,31 @@ public class QueryResultHandler {
 
   }
 
+
   private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
-    private UserResultsListener resultsListener;
+    private final UserResultsListener resultsListener;
+    private final RemoteConnection connection;
+    private final ChannelFuture closeFuture;
+    private final ChannelClosedListener closeListener;
 
-    public SubmissionListener(UserResultsListener resultsListener) {
+    public SubmissionListener(RemoteConnection connection, UserResultsListener resultsListener) {
       super();
       this.resultsListener = resultsListener;
+      this.connection = connection;
+      this.closeFuture = connection.getChannel().closeFuture();
+      this.closeListener = new ChannelClosedListener();
+      closeFuture.addListener(closeListener);
+    }
+
+    private class ChannelClosedListener implements GenericFutureListener<Future<Void>> {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        resultsListener.submissionFailed(UserException.connectionError()
+            .message("Connection %s closed unexpectedly.", connection.getName())
+            .build());
+      }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a8bad78..b39a103 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack.Builder;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.HandshakeStatus;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -51,13 +51,21 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
 
   private boolean supportComplexTypes = true;
 
-  public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+  public UserClient(DrillConfig config, boolean supportComplexTypes, BufferAllocator alloc,
+      EventLoopGroup eventLoopGroup) {
+    super(
+        UserRpcConfig.getMapping(config),
+        alloc,
+        eventLoopGroup,
+        RpcType.HANDSHAKE,
+        BitToUserHandshake.class,
+        BitToUserHandshake.PARSER,
+        "user client");
     this.supportComplexTypes = supportComplexTypes;
   }
 
   public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
-    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+    send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
   }
 
   public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
@@ -66,6 +74,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
         .setSupportComplexTypes(supportComplexTypes)
+        .setSupportTimeout(true)
         .setCredentials(credentials);
 
     if (props != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 88592d4..ae728d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.rpc.user;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -30,13 +32,17 @@ import org.apache.drill.exec.rpc.RpcConfig;
 public class UserRpcConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
 
-  public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
-      .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
-      .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
-      .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
-      .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
-      .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
-      .build();
+  public static RpcConfig getMapping(DrillConfig config) {
+    return RpcConfig.newBuilder()
+        .name("USER")
+        .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
+        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+        .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
+        .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
+        .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
+        .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+        .build();
+  }
 
   public static int RPC_VERSION = 5;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9e929de..b3b7ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.rpc.user;
 
-import com.google.common.io.Closeables;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -56,6 +55,7 @@ import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
 import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
 import org.apache.drill.exec.work.user.UserWorker;
 
+import com.google.common.io.Closeables;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
@@ -68,7 +68,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
   public UserServer(DrillConfig config, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
       UserWorker worker) throws DrillbitStartupException {
-    super(UserRpcConfig.MAPPING, alloc.getUnderlyingAllocator(), eventLoopGroup);
+    super(UserRpcConfig.getMapping(config), alloc.getUnderlyingAllocator(), eventLoopGroup);
     this.worker = worker;
     this.alloc = alloc;
     if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
@@ -123,8 +123,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
     private UserSession session;
 
-    public UserClientConnection(Channel channel) {
-      super(channel);
+    public UserClientConnection(SocketChannel channel) {
+      super(channel, "user client");
+    }
+
+    void disableReadTimeout() {
+      getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
     }
 
     void setUser(UserToBitHandshake inbound) throws IOException {
@@ -161,7 +165,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
 
   @Override
-  public UserClientConnection initRemoteConnection(Channel channel) {
+  public UserClientConnection initRemoteConnection(SocketChannel channel) {
     return new UserClientConnection(channel);
   }
 
@@ -186,6 +190,13 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       public BitToUserHandshake getHandshakeResponse(UserToBitHandshake inbound) throws Exception {
         logger.trace("Handling handshake from user to bit. {}", inbound);
 
+
+        // if timeout is unsupported or is set to false, disable timeout.
+        if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
+          connection.disableReadTimeout();
+          logger.warn("Timeout Disabled as client doesn't support it.", connection.getName());
+        }
+
         BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
             .setRpcVersion(UserRpcConfig.RPC_VERSION);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 49d0c94..b7ef584 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,6 +18,9 @@
 package org.apache.drill.exec.work.foreman;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -121,6 +124,9 @@ public class Foreman implements Runnable {
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
+  private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
+  private final ChannelFuture closeFuture;
+
 
   /**
    * Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -139,6 +145,9 @@ public class Foreman implements Runnable {
     this.drillbitContext = drillbitContext;
 
     initiatingClient = connection;
+    this.closeFuture = initiatingClient.getChannel().closeFuture();
+    closeFuture.addListener(closeListener);
+
     queryContext = new QueryContext(connection.getSession(), drillbitContext);
     queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
         stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
@@ -146,6 +155,13 @@ public class Foreman implements Runnable {
     recordNewState(QueryState.PENDING);
   }
 
+  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
+    @Override
+    public void operationComplete(Future<Void> future) throws Exception {
+      cancel();
+    }
+  }
+
   /**
    * Get the QueryContext created for the query.
    *
@@ -603,6 +619,9 @@ public class Foreman implements Runnable {
       logger.info("foreman cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
 
+      // remove the channel disconnected listener (doesn't throw)
+      closeFuture.removeListener(closeListener);
+
       // These are straight forward removals from maps, so they won't throw.
       drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
       drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8006533..d98b97a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -27,6 +27,7 @@ drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
     user: {
+      timeout: 30,
       server: {
         port: 31010
         threads: 1
@@ -36,6 +37,7 @@ drill.exec: {
       }
     },
     bit: {
+      timeout: 30,
       server: {
         port : 31011,
         retry:{

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
index f47e719..c28cc29 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/GeneralRPCProtos.java
@@ -42,6 +42,14 @@ public final class GeneralRPCProtos {
      * <code>RESPONSE_FAILURE = 2;</code>
      */
     RESPONSE_FAILURE(2, 2),
+    /**
+     * <code>PING = 3;</code>
+     */
+    PING(3, 3),
+    /**
+     * <code>PONG = 4;</code>
+     */
+    PONG(4, 4),
     ;
 
     /**
@@ -56,6 +64,14 @@ public final class GeneralRPCProtos {
      * <code>RESPONSE_FAILURE = 2;</code>
      */
     public static final int RESPONSE_FAILURE_VALUE = 2;
+    /**
+     * <code>PING = 3;</code>
+     */
+    public static final int PING_VALUE = 3;
+    /**
+     * <code>PONG = 4;</code>
+     */
+    public static final int PONG_VALUE = 4;
 
 
     public final int getNumber() { return value; }
@@ -65,6 +81,8 @@ public final class GeneralRPCProtos {
         case 0: return REQUEST;
         case 1: return RESPONSE;
         case 2: return RESPONSE_FAILURE;
+        case 3: return PING;
+        case 4: return PONG;
         default: return null;
       }
     }
@@ -1972,10 +1990,10 @@ public final class GeneralRPCProtos {
       "rdination_id\030\002 \001(\005\022\020\n\010rpc_type\030\003 \001(\005\"b\n\022" +
       "CompleteRpcMessage\022#\n\006header\030\001 \001(\0132\023.exe" +
       "c.rpc.RpcHeader\022\025\n\rprotobuf_body\030\002 \001(\014\022\020" +
-      "\n\010raw_body\030\003 \001(\014*:\n\007RpcMode\022\013\n\007REQUEST\020\000" +
-      "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002B1\n\033" +
-      "org.apache.drill.exec.protoB\020GeneralRPCP" +
-      "rotosH\001"
+      "\n\010raw_body\030\003 \001(\014*N\n\007RpcMode\022\013\n\007REQUEST\020\000" +
+      "\022\014\n\010RESPONSE\020\001\022\024\n\020RESPONSE_FAILURE\020\002\022\010\n\004" +
+      "PING\020\003\022\010\n\004PONG\020\004B1\n\033org.apache.drill.exe" +
+      "c.protoB\020GeneralRPCProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/960f876a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index d587dfc..6fc43bb 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -280,6 +280,8 @@ public final class SchemaUserProtos
 
                 if(message.hasSupportComplexTypes())
                     output.writeBool(6, message.getSupportComplexTypes(), false);
+                if(message.hasSupportTimeout())
+                    output.writeBool(7, message.getSupportTimeout(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message)
             {
@@ -339,6 +341,9 @@ public final class SchemaUserProtos
                         case 6:
                             builder.setSupportComplexTypes(input.readBool());
                             break;
+                        case 7:
+                            builder.setSupportTimeout(input.readBool());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -385,6 +390,7 @@ public final class SchemaUserProtos
                 case 4: return "credentials";
                 case 5: return "properties";
                 case 6: return "supportComplexTypes";
+                case 7: return "supportTimeout";
                 default: return null;
             }
         }
@@ -402,6 +408,7 @@ public final class SchemaUserProtos
             fieldMap.put("credentials", 4);
             fieldMap.put("properties", 5);
             fieldMap.put("supportComplexTypes", 6);
+            fieldMap.put("supportTimeout", 7);
         }
     }
 


[3/3] drill git commit: DRILL-2897: Avoid parallelization when Drill is running limit 0 query.

Posted by ja...@apache.org.
DRILL-2897: Avoid parallelization when Drill is running limit 0 query.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9d209622
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9d209622
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9d209622

Branch: refs/heads/master
Commit: 9d209622934cb2be929a1591fd6a059165e1ac39
Parents: 960f876
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Apr 26 22:35:57 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 9 08:03:59 2015 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/PlannerSettings.java  |   7 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |  57 +++----
 .../planner/sql/handlers/FindLimit0Visitor.java | 119 +++++++++++++++
 .../org/apache/drill/TestExampleQueries.java    |   1 +
 .../java/org/apache/drill/TestTpchLimit0.java   | 149 +++++++++++++++++++
 5 files changed, 305 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 7d8dd97..d4fb4f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -33,6 +33,7 @@ public class PlannerSettings implements Context{
 
   private int numEndPoints = 0;
   private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
+  private boolean forceSingleMode;
 
   public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE;
   public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 1024;
@@ -81,7 +82,11 @@ public class PlannerSettings implements Context{
   }
 
   public boolean isSingleMode() {
-    return options.getOption(EXCHANGE.getOptionName()).bool_val;
+    return forceSingleMode || options.getOption(EXCHANGE.getOptionName()).bool_val;
+  }
+
+  public void forceSingleMode() {
+    forceSingleMode = true;
   }
 
   public int numEndPoints() {

http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index eda1b5f..188aaa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -22,19 +22,23 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.rules.ProjectRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.TypedSqlNode;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
@@ -55,7 +59,6 @@ import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
 import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
@@ -77,13 +80,6 @@ import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.hep.HepPlanner;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.apache.calcite.sql.SqlNode;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
@@ -215,23 +211,23 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     return rel;
   }
 
-  protected RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException{
-     /* Traverse the tree to do the following pre-processing tasks:
-     * 1. replace the convert_from, convert_to function to actual implementations
-     * Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR);
-     * TODO: Ideally all function rewrites would move here instead of DrillOptiq
+  protected RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException {
+    /*
+     * Traverse the tree to do the following pre-processing tasks: 1. replace the convert_from, convert_to function to
+     * actual implementations Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR); TODO: Ideally all
+     * function rewrites would move here instead of DrillOptiq.
      *
-     * 2. see where the tree contains unsupported functions;
-     * throw SqlUnsupportedException if there is
+     * 2. see where the tree contains unsupported functions; throw SqlUnsupportedException if there is any.
      */
 
-     PreProcessLogicalRel visitor = PreProcessLogicalRel.createVisitor(planner.getTypeFactory(), context.getDrillOperatorTable());
-     try {
-        rel = rel.accept(visitor);
-     } catch(UnsupportedOperationException ex) {
-        visitor.convertException();
-       throw ex;
-     }
+    PreProcessLogicalRel visitor = PreProcessLogicalRel.createVisitor(planner.getTypeFactory(),
+        context.getDrillOperatorTable());
+    try {
+      rel = rel.accept(visitor);
+    } catch (UnsupportedOperationException ex) {
+      visitor.convertException();
+      throw ex;
+    }
 
     return rel;
   }
@@ -243,9 +239,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       if (convertedRelNode instanceof DrillStoreRel) {
         throw new UnsupportedOperationException();
       } else {
+
+        // If the query contains a limit 0 clause, disable distributed mode since it is overkill for determining schema.
+        if (FindLimit0Visitor.containsLimit0(convertedRelNode)) {
+          context.getPlannerSettings().forceSingleMode();
+        }
+
         // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
-        DrillRel topPreservedNameProj = addRenamedProject((DrillRel)convertedRelNode, validatedRowType);
-        return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), topPreservedNameProj);
+        DrillRel topPreservedNameProj = addRenamedProject((DrillRel) convertedRelNode, validatedRowType);
+        return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(),
+            topPreservedNameProj);
       }
     } catch (RelOptPlanner.CannotPlanException ex) {
       logger.error(ex.getMessage());

http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
new file mode 100644
index 0000000..d2c5fa6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
+
+/**
+ * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
+ * inform the planner settings that this plan should be run as a single node plan to reduce the overhead associated with
+ * executing a schema-only query.
+ */
+public class FindLimit0Visitor extends RelShuttleImpl {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class);
+
+  private boolean contains = false;
+
+  public static boolean containsLimit0(RelNode rel) {
+    FindLimit0Visitor visitor = new FindLimit0Visitor();
+    rel.accept(visitor);
+    return visitor.isContains();
+  }
+
+  private FindLimit0Visitor() {
+  }
+
+  boolean isContains() {
+    return contains;
+  }
+
+  private boolean isLimit0(RexNode fetch) {
+    if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
+      RexLiteral l = (RexLiteral) fetch;
+      switch (l.getTypeName()) {
+      case BIGINT:
+      case INTEGER:
+      case DECIMAL:
+        if (((long) l.getValue2()) == 0) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public RelNode visit(LogicalSort sort) {
+    if (isLimit0(sort.fetch)) {
+      contains = true;
+      return sort;
+    }
+
+    return super.visit(sort);
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    if (other instanceof DrillLimitRel) {
+      if (isLimit0(((DrillLimitRel) other).getFetch())) {
+        contains = true;
+        return other;
+      }
+    }
+
+    return super.visit(other);
+  }
+
+  // The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning.
+
+  @Override
+  public RelNode visit(LogicalAggregate aggregate) {
+    return aggregate;
+  }
+
+  @Override
+  public RelNode visit(LogicalIntersect intersect) {
+    return intersect;
+  }
+
+  @Override
+  public RelNode visit(LogicalJoin join) {
+    return join;
+  }
+
+  @Override
+  public RelNode visit(LogicalMinus minus) {
+    return minus;
+  }
+
+  @Override
+  public RelNode visit(LogicalUnion union) {
+    return union;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 7e07500..18336cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -284,6 +284,7 @@ public class TestExampleQueries extends BaseTestQuery{
   }
 
   @Test
+  @Ignore("DRILL-3004")
   public void testJoin() throws Exception{
     test("alter session set `planner.enable_hashjoin` = false");
     test("SELECT\n" +

http://git-wip-us.apache.org/repos/asf/drill/blob/9d209622/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
new file mode 100644
index 0000000..22471c8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestTpchLimit0 extends BaseTestQuery{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchLimit0.class);
+
+  private void testLimitZero(String fileName) throws Exception {
+    String query = getFile(fileName);
+    query = "ALTER SESSION SET `planner.slice_target` = 1; select * from \n(" + query.replace(";", ")xyz limit 0;");
+    test(query);
+  }
+
+  @Test
+  public void tpch01() throws Exception{
+    testLimitZero("queries/tpch/01.sql");
+  }
+
+  @Test
+  @Ignore // DRILL-512
+  public void tpch02() throws Exception{
+    testLimitZero("queries/tpch/02.sql");
+  }
+
+  @Test
+  public void tpch03() throws Exception{
+    testLimitZero("queries/tpch/03.sql");
+  }
+
+  @Test
+  public void tpch04() throws Exception{
+    testLimitZero("queries/tpch/04.sql");
+  }
+
+  @Test
+  public void tpch05() throws Exception{
+    testLimitZero("queries/tpch/05.sql");
+  }
+
+  @Test
+  public void tpch06() throws Exception{
+    testLimitZero("queries/tpch/06.sql");
+  }
+
+  @Test
+  public void tpch07() throws Exception{
+    testLimitZero("queries/tpch/07.sql");
+  }
+
+  @Test
+  public void tpch08() throws Exception{
+    testLimitZero("queries/tpch/08.sql");
+  }
+
+  @Test
+  public void tpch09() throws Exception{
+    testLimitZero("queries/tpch/09.sql");
+  }
+
+  @Test
+  public void tpch10() throws Exception{
+    testLimitZero("queries/tpch/10.sql");
+  }
+
+  @Test
+  @Ignore // Cartesian problem
+  public void tpch11() throws Exception{
+    testLimitZero("queries/tpch/11.sql");
+  }
+
+  @Test
+  public void tpch12() throws Exception{
+    testLimitZero("queries/tpch/12.sql");
+  }
+
+  @Test
+  public void tpch13() throws Exception{
+    testLimitZero("queries/tpch/13.sql");
+  }
+
+  @Test
+  public void tpch14() throws Exception{
+    testLimitZero("queries/tpch/14.sql");
+  }
+
+  @Test
+  @Ignore //
+  public void tpch15() throws Exception{
+    testLimitZero("queries/tpch/15.sql");
+  }
+
+  @Test
+  @Ignore // invalid plan, due to Nulls value NOT IN sub-q
+  public void tpch16() throws Exception{
+    testLimitZero("queries/tpch/16.sql");
+  }
+
+  @Test
+  @Ignore //
+  public void tpch17() throws Exception{
+    testLimitZero("queries/tpch/17.sql");
+  }
+
+  @Test
+  public void tpch18() throws Exception{
+    testLimitZero("queries/tpch/18.sql");
+  }
+
+  @Test
+  @Ignore // DRILL-519
+  public void tpch19() throws Exception{
+    testLimitZero("queries/tpch/19.sql");
+  }
+
+  @Test
+  public void tpch20() throws Exception{
+    testLimitZero("queries/tpch/20.sql");
+  }
+
+  @Test
+  @Ignore
+  public void tpch21() throws Exception{
+    testLimitZero("queries/tpch/21.sql");
+  }
+
+  @Test
+  @Ignore // DRILL-518
+  public void tpch22() throws Exception{
+    testLimitZero("queries/tpch/22.sql");
+  }
+}