You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:45 UTC

[4/6] drill git commit: DRILL-4335: Apache Drill should support network encryption.

DRILL-4335: Apache Drill should support network encryption.

    NOTE: This pull request provides support for on-wire encryption using SASL framework. The communication channel that are covered are:
    1) Between Drill JDBC client and Drillbit.
    2) Between Drillbit to Drillbit i.e. control/data channels.
    3) It has UI change to view encryption is enabled on which network channel and number of encrypted/unencrypted connections for
       user/control/data connections.

close apache/drill#773


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ce8bbc01
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ce8bbc01
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ce8bbc01

Branch: refs/heads/master
Commit: ce8bbc01cfde7d714185919be2ca2923d19ea890
Parents: 416ec70
Author: Sorabh Hamirwasia <sh...@maprtech.com>
Authored: Wed Feb 1 18:44:21 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat May 20 16:16:21 2017 -0700

----------------------------------------------------------------------
 .../drill/common/config/DrillProperties.java    |  16 +
 .../org/apache/drill/exec/ExecConstants.java    |   5 +
 .../exec/rpc/AbstractClientConnection.java      |  42 +-
 .../exec/rpc/AbstractConnectionConfig.java      |  11 +
 .../drill/exec/rpc/AbstractRpcMetrics.java      |  52 ++
 .../exec/rpc/AbstractServerConnection.java      |  55 +-
 .../drill/exec/rpc/BitConnectionConfig.java     |  32 +-
 .../apache/drill/exec/rpc/ConnectionConfig.java |   3 +
 .../drill/exec/rpc/control/ControlClient.java   |  15 +-
 .../exec/rpc/control/ControlConnection.java     |  45 +-
 .../exec/rpc/control/ControlRpcMetrics.java     |  84 +++
 .../drill/exec/rpc/control/ControlServer.java   |   8 +-
 .../drill/exec/rpc/control/ControllerImpl.java  |   3 +
 .../apache/drill/exec/rpc/data/DataClient.java  |  21 +-
 .../exec/rpc/data/DataClientConnection.java     |  16 +-
 .../exec/rpc/data/DataConnectionCreator.java    |   5 +-
 .../drill/exec/rpc/data/DataRpcMetrics.java     |  85 +++
 .../apache/drill/exec/rpc/data/DataServer.java  |   8 +-
 .../exec/rpc/data/DataServerConnection.java     |   9 +
 .../security/AuthenticationOutcomeListener.java | 104 +++-
 .../rpc/security/AuthenticatorProviderImpl.java |   1 -
 .../security/ClientAuthenticatorProvider.java   |   2 +-
 .../drill/exec/rpc/security/SaslProperties.java |  70 +++
 .../security/ServerAuthenticationHandler.java   |  96 +++-
 .../rpc/security/kerberos/KerberosFactory.java  |  24 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |  80 ++-
 .../exec/rpc/user/UserConnectionConfig.java     |  39 +-
 .../drill/exec/rpc/user/UserRpcMetrics.java     |  84 +++
 .../apache/drill/exec/rpc/user/UserServer.java  |  38 +-
 .../user/security/UserAuthenticatorFactory.java |   6 +
 .../drill/exec/server/rest/DrillRoot.java       |  21 +-
 .../drill/exec/service/ServiceEngine.java       |  50 +-
 .../src/main/resources/drill-module.conf        |   8 +
 .../java-exec/src/main/resources/rest/index.ftl |  21 +
 .../drill/exec/rpc/data/TestBitBitKerberos.java | 326 ++++++-----
 .../apache/drill/exec/rpc/data/TestBitRpc.java  |  18 +-
 .../drill/exec/rpc/security/KerberosHelper.java | 150 +++++
 .../rpc/user/security/TestUserBitKerberos.java  | 163 ++----
 .../security/TestUserBitKerberosEncryption.java | 539 ++++++++++++++++++
 .../exec/rpc/AbstractRemoteConnection.java      | 108 +++-
 .../org/apache/drill/exec/rpc/BasicClient.java  |  15 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |  16 +-
 .../drill/exec/rpc/ChunkCreationHandler.java    |  99 ++++
 .../apache/drill/exec/rpc/ClientConnection.java |   5 +-
 .../drill/exec/rpc/EncryptionContext.java       |  37 ++
 .../drill/exec/rpc/EncryptionContextImpl.java   |  95 ++++
 .../apache/drill/exec/rpc/RemoteConnection.java |   2 +
 .../org/apache/drill/exec/rpc/RpcConstants.java |  27 +-
 .../org/apache/drill/exec/rpc/RpcEncoder.java   |   2 +-
 .../org/apache/drill/exec/rpc/RpcMetrics.java   |  29 +
 .../org/apache/drill/exec/rpc/SaslCodec.java    |  34 ++
 .../drill/exec/rpc/SaslDecryptionHandler.java   | 160 ++++++
 .../drill/exec/rpc/SaslEncryptionHandler.java   | 177 ++++++
 .../apache/drill/exec/rpc/ServerConnection.java |   4 +-
 pom.xml                                         |   2 +
 protocol/readme.txt                             |  16 +-
 .../drill/exec/proto/SchemaUserProtos.java      |  14 +
 .../org/apache/drill/exec/proto/UserProtos.java | 547 ++++++++++++-------
 .../exec/proto/beans/BitToUserHandshake.java    |  44 ++
 .../drill/exec/proto/beans/SaslSupport.java     |   4 +-
 protocol/src/main/protobuf/User.proto           |   3 +
 61 files changed, 3122 insertions(+), 673 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
index c7e6e29..75064e0 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.common.config;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -59,6 +60,12 @@ public final class DrillProperties extends Properties {
 
   public static final String KEYTAB = "keytab";
 
+  public static final String SASL_ENCRYPT = "sasl_encrypt";
+
+  // Should only be used for testing backward compatibility
+  @VisibleForTesting
+  public static final String TEST_SASL_LEVEL = "test_sasl_level";
+
   // for subject that has pre-authenticated to KDC (AS) i.e. required credentials are populated in
   // Subject's credentials set
   public static final String KERBEROS_FROM_SUBJECT = "from_subject";
@@ -110,6 +117,15 @@ public final class DrillProperties extends Properties {
     }
   }
 
+  public void merge(final Map<String, String> overrides) {
+    if (overrides == null) {
+      return;
+    }
+    for (final String key : overrides.keySet()) {
+      setProperty(key.toLowerCase(), overrides.get(key));
+    }
+  }
+
   /**
    * Returns a map of keys and values in this property list where the key and its corresponding value are strings,
    * including distinct keys in the default property list if a key of the same name has not already been found from

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index e291524..007e39a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -116,6 +116,11 @@ public interface ExecConstants {
   String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
   String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
   String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
+  String USER_ENCRYPTION_SASL_ENABLED = "drill.exec.security.user.encryption.sasl.enabled";
+  String USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.user.encryption.sasl.max_wrapped_size";
+  String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";
+  String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size";
+
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
index 055ea59..ab13c2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
@@ -30,8 +30,13 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
 
   private SaslClient saslClient;
 
+  public AbstractClientConnection(SocketChannel channel, String name,
+                                  EncryptionContext encryptContext) {
+    super(channel, name, encryptContext);
+  }
+
   public AbstractClientConnection(SocketChannel channel, String name) {
-    super(channel, name);
+    this(channel, name, new EncryptionContextImpl());
   }
 
   protected abstract Logger getLogger();
@@ -40,6 +45,25 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
   public void setSaslClient(final SaslClient saslClient) {
     checkState(this.saslClient == null);
     this.saslClient = saslClient;
+
+    // If encryption is enabled set the backend wrapper instance corresponding to this SaslClient in the connection
+    // object. This is later used to do wrap/unwrap in handlers.
+    if (isEncryptionEnabled()) {
+      saslCodec = new SaslCodec() {
+
+        @Override
+        public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+          checkState(saslClient != null);
+          return saslClient.wrap(data, offset, len);
+        }
+
+        @Override
+        public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+          checkState(saslClient != null);
+          return saslClient.unwrap(data, offset, len);
+        }
+      };
+    }
   }
 
   @Override
@@ -49,7 +73,7 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
   }
 
   @Override
-  public void close() {
+  public void disposeSaslClient() {
     try {
       if (saslClient != null) {
         saslClient.dispose();
@@ -58,6 +82,18 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
     } catch (final SaslException e) {
       getLogger().warn("Unclean disposal", e);
     }
-    super.close();
+  }
+
+  @Override
+  public void channelClosed(RpcException ex) {
+    // This will be triggered from Netty when a channel is closed. We should cleanup here
+    // as this will handle case for both client closing the connection or server closing the
+    // connection.
+    disposeSaslClient();
+
+    // Decrease the connection counter here since the close handler will be triggered
+    // for all the types of connection
+    decConnectionCounter();
+    super.channelClosed(ex);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
index fb815ab..76c17e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
@@ -26,10 +26,12 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
 
   private final BufferAllocator allocator;
   private final BootStrapContext context;
+  protected EncryptionContext encryptionContext;
 
   protected AbstractConnectionConfig(BufferAllocator allocator, BootStrapContext context) {
     this.allocator = allocator;
     this.context = context;
+    this.encryptionContext = new EncryptionContextImpl();
   }
 
   @Override
@@ -46,4 +48,13 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
   public AuthenticatorProvider getAuthProvider() {
     return context.getAuthProvider();
   }
+
+  @Override
+  public boolean isEncryptionEnabled() {
+    return encryptionContext.isEncryptionEnabled();
+  }
+
+  public EncryptionContext getEncryptionCtxt() {
+    return encryptionContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
new file mode 100644
index 0000000..a1fd308
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.codahale.metrics.Gauge;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+
+public abstract class AbstractRpcMetrics implements RpcMetrics {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRpcMetrics.class);
+
+  protected boolean useEncryptedCounter;
+
+  public static final String CONNECTION_COUNTER_PREFIX = "drill.connections.rpc.";
+
+  public static final String ALLOCATOR_METRICS_PREFIX = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
+
+  protected void registerAllocatorMetrics(final BufferAllocator allocator, final String metricPrefix) {
+    DrillMetrics.register(metricPrefix + "used", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return allocator.getAllocatedMemory();
+      }
+    });
+
+    DrillMetrics.register(metricPrefix + "peak", new Gauge<Long>() {
+      @Override
+      public Long getValue() {
+        return allocator.getPeakMemoryAllocation();
+      }
+    });
+  }
+
+  public abstract void initialize(boolean useEncryptedCounter, BufferAllocator allocator);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
index db87bfc..f10f6d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.channel.socket.SocketChannel;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.security.SaslProperties;
 import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -42,7 +43,7 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
 
   public AbstractServerConnection(SocketChannel channel, String name, ConnectionConfig config,
                                   RequestHandler<S> handler) {
-    super(channel, name);
+    super(channel, name, config.getEncryptionCtxt());
     this.config = config;
     this.currentHandler = handler;
   }
@@ -65,8 +66,8 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
     try {
       this.saslServer = config.getAuthProvider()
           .getAuthenticatorFactory(mechanismName)
-          .createSaslServer(UserGroupInformation.getLoginUser(), null
-              /** properties; default QOP is auth */);
+          .createSaslServer(UserGroupInformation.getLoginUser(),
+              SaslProperties.getSaslProperties(isEncryptionEnabled(), getMaxWrappedSize()));
     } catch (final IOException e) {
       getLogger().debug("Login failed.", e);
       final Throwable cause = e.getCause();
@@ -76,7 +77,27 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
       throw new SaslException("Unexpected failure trying to login.", cause);
     }
     if (saslServer == null) {
-      throw new SaslException("Server could not initiate authentication. Insufficient parameters?");
+      throw new SaslException(String.format("Server cannot initiate authentication using %s mechanism. Insufficient" +
+          " parameters or selected mechanism doesn't support configured security layers ?", mechanismName));
+    }
+
+    // If encryption is enabled set the backend wrapper instance corresponding to this SaslServer in the connection
+    // object. This is later used to do wrap/unwrap in handlers.
+    if (isEncryptionEnabled()) {
+      saslCodec = new SaslCodec() {
+
+        @Override
+        public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+          checkState(saslServer != null);
+          return saslServer.wrap(data, offset, len);
+        }
+
+        @Override
+        public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+          checkState(saslServer != null);
+          return saslServer.unwrap(data, offset, len);
+        }
+      };
     }
   }
 
@@ -110,7 +131,17 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
   }
 
   @Override
-  public void close() {
+  public void setEncryption(boolean encrypted) {
+    throw new UnsupportedOperationException("Changing encryption setting on server connection is not permitted.");
+  }
+
+  @Override
+  public void setMaxWrappedSize(int maxWrappedSize) {
+    throw new UnsupportedOperationException("Changing maxWrappedSize setting on server connection is not permitted.");
+  }
+
+  @Override
+  public void disposeSaslServer() {
     try {
       if (saslServer != null) {
         saslServer.dispose();
@@ -119,6 +150,18 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
     } catch (final SaslException e) {
       getLogger().warn("Unclean disposal.", e);
     }
-    super.close();
+  }
+
+  @Override
+  public void channelClosed(RpcException ex) {
+    // This will be triggered from Netty when a channel is closed. We should cleanup here
+    // as this will handle case for both client closing the connection or server closing the
+    // connection.
+    disposeSaslServer();
+
+    // Decrease the connection counter here since the close handler will be triggered
+    // for all the types of connection
+    decConnectionCounter();
+    super.channelClosed(ex);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
index 71e5a86..7d9ebec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.security.AuthStringUtil;
 import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -46,16 +47,38 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
     super(allocator, context);
 
     final DrillConfig config = context.getConfig();
+    final AuthenticatorProvider authProvider = getAuthProvider();
+
     if (config.getBoolean(ExecConstants.BIT_AUTHENTICATION_ENABLED)) {
       this.authMechanismToUse = config.getString(ExecConstants.BIT_AUTHENTICATION_MECHANISM);
       try {
-        getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+        authProvider.getAuthenticatorFactory(authMechanismToUse);
       } catch (final SaslException e) {
         throw new DrillbitStartupException(String.format(
             "'%s' mechanism not found for bit-to-bit authentication. Please check authentication configuration.",
             authMechanismToUse));
       }
-      logger.info("Configured bit-to-bit connections to require authentication using: {}", authMechanismToUse);
+
+      // Update encryption related configurations
+      encryptionContext.setEncryption(config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED));
+      final int maxWrappedSize = config.getInt(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE);
+
+      if (maxWrappedSize <= 0) {
+        throw new DrillbitStartupException(String.format("Invalid value configured for " +
+            "bit.encryption.sasl.max_wrapped_size. Must be a positive integer in bytes with a recommended max value " +
+            "of %s", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE));
+      } else if (maxWrappedSize > RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE) {
+        logger.warn("The configured value of bit.encryption.sasl.max_wrapped_size is too big. This may cause higher" +
+            " memory pressure. [Details: Recommended max value is %s]", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
+      }
+      encryptionContext.setMaxWrappedSize(maxWrappedSize);
+
+      logger.info("Configured bit-to-bit connections to require authentication using: {} with encryption: {}",
+          authMechanismToUse, encryptionContext.getEncryptionCtxtString());
+
+    } else if (config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED)) {
+      throw new DrillbitStartupException("Invalid security configuration. Encryption using SASL is enabled with " +
+          "authentication disabled. Please check the security.bit configurations.");
     } else {
       this.authMechanismToUse = null;
     }
@@ -78,7 +101,8 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
     return getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
   }
 
-  public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint) throws IOException {
+  public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint,
+                                                final Map<String, String> overrides) throws IOException {
     final DrillProperties properties = DrillProperties.createEmpty();
 
     final UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
@@ -93,6 +117,8 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
         properties.setProperty(DrillProperties.SERVICE_PRINCIPAL, loginPrincipal.toString());
       }
     }
+
+    properties.merge(overrides);
     return properties.stringPropertiesAsMap();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
index 706b088..5b8a70b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
@@ -31,4 +31,7 @@ public interface ConnectionConfig {
 
   AuthenticatorProvider getAuthProvider();
 
+  boolean isEncryptionEnabled();
+
+  EncryptionContext getEncryptionCtxt();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index 6ebe1c4..a46e968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.rpc.control;
 
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
@@ -36,13 +38,14 @@ import org.apache.drill.exec.rpc.RpcCommand;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.FailingRequestHandler;
+import org.apache.drill.exec.rpc.security.SaslProperties;
 
-import com.google.protobuf.MessageLite;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake> {
@@ -105,9 +108,12 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
     if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
       final SaslClient saslClient;
       try {
+        final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+                                                                                    connection.getMaxWrappedSize());
+
         saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
             .createSaslClient(UserGroupInformation.getLoginUser(),
-                config.getSaslClientProperties(remoteEndpoint));
+                config.getSaslClientProperties(remoteEndpoint, saslProperties));
       } catch (final IOException e) {
         throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
       }
@@ -118,7 +124,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
     } else {
       if (config.getAuthMechanismToUse() != null) { // local requires authentication
         throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
-                remoteEndpoint.getAddress()));
+            remoteEndpoint.getAddress()));
       }
     }
   }
@@ -126,6 +132,9 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
   @Override
   protected void finalizeConnection(BitControlHandshake handshake, ControlConnection connection) {
     connection.setEndpoint(handshake.getEndpoint());
+
+    // Increment the Control Connection counter.
+    connection.incConnectionCounter();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a50a3b0..70189d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,15 +18,20 @@
 package org.apache.drill.exec.rpc.control;
 
 import com.google.protobuf.MessageLite;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.socket.SocketChannel;
+
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.AbstractServerConnection;
 import org.apache.drill.exec.rpc.ClientConnection;
 import org.apache.drill.exec.rpc.RequestHandler;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.SaslCodec;
+
 import org.slf4j.Logger;
 
 import javax.security.sasl.SaslClient;
@@ -115,6 +120,24 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
   public void setSaslClient(final SaslClient saslClient) {
     checkState(this.saslClient == null);
     this.saslClient = saslClient;
+
+    // If encryption is enabled set the backend wrapper instance corresponding to this SaslClient in the connection
+    // object. This is later used to do wrap/unwrap in handlers.
+    if (isEncryptionEnabled()) {
+      saslCodec = new SaslCodec() {
+        @Override
+        public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+          assert saslClient != null;
+          return saslClient.wrap(data, offset, len);
+        }
+
+        @Override
+        public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+          assert saslClient != null;
+          return saslClient.unwrap(data, offset, len);
+        }
+      };
+    }
   }
 
   @Override
@@ -124,7 +147,7 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
   }
 
   @Override
-  public void close() {
+  public void disposeSaslClient() {
     try {
       if (saslClient != null) {
         saslClient.dispose();
@@ -133,7 +156,25 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
     } catch (final SaslException e) {
       getLogger().warn("Unclean disposal", e);
     }
-    super.close();
   }
 
+  @Override
+  public void channelClosed(RpcException ex) {
+    // This will be triggered from Netty when a channel is closed. We should cleanup here
+    // as this will handle case for both client closing the connection or server closing the
+    // connection.
+    disposeSaslClient();
+
+    super.channelClosed(ex);
+  }
+
+  @Override
+  public void incConnectionCounter() {
+    ControlRpcMetrics.getInstance().addConnectionCount();
+  }
+
+  @Override
+  public void decConnectionCounter() {
+    ControlRpcMetrics.getInstance().decConnectionCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
new file mode 100644
index 0000000..ae9e7cc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit control rpc layer
+ */
+class ControlRpcMetrics extends AbstractRpcMetrics {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcMetrics.class);
+
+  // Total number of control connection's as client and server for a DrillBit.
+  // i.e. Sum of incoming and outgoing control connections.
+  private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "control.encrypted");
+
+  private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "control.unencrypted");
+
+  private static final RpcMetrics INSTANCE = new ControlRpcMetrics();
+
+  // prevent instantiation
+  private ControlRpcMetrics() {
+  }
+
+  public static RpcMetrics getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Should only be called when first access to getInstance is made. In this case inside {@link ControllerImpl}.
+   * {@link ControlConnection} using the singleton instance should not call initialize.
+   *
+   * @param useEncryptedCounter
+   * @param allocator
+   */
+  @Override
+  public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+    this.useEncryptedCounter = useEncryptedCounter;
+    registerAllocatorMetrics(allocator);
+  }
+
+  @Override
+  public void addConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.inc();
+    } else {
+      unencryptedConnection.inc();
+    }
+  }
+
+  @Override
+  public void decConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.dec();
+    } else {
+      unencryptedConnection.dec();
+    }
+  }
+
+  private void registerAllocatorMetrics(final BufferAllocator allocator) {
+    registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.control.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 9e733df..09f6705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -17,10 +17,10 @@
  */
 package org.apache.drill.exec.rpc.control;
 
+import com.google.protobuf.MessageLite;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
 import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -30,8 +30,6 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
 
-import com.google.protobuf.MessageLite;
-
 public class ControlServer extends BasicServer<RpcType, ControlConnection>{
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
 
@@ -105,6 +103,10 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
         if (config.getAuthMechanismToUse() != null) {
           builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
         }
+
+        // Increase the Control Connection counter on server side
+        connection.incConnectionCounter();
+
         return builder.build();
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 2bf5ad3..7ce2e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -48,6 +48,9 @@ public class ControllerImpl implements Controller {
     config = new ControlConnectionConfig(allocator, context, handler);
     this.connectionRegistry = new ConnectionManagerRegistry(config);
     this.handlerRegistry = handler.getHandlerRegistry();
+
+    // Initialize the singleton instance of ControlRpcMetrics.
+    ((ControlRpcMetrics)ControlRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index a37008d..603168d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,11 +18,11 @@
 package org.apache.drill.exec.rpc.data;
 
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -34,15 +34,15 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcCommand;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-
-import com.google.protobuf.MessageLite;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
+import org.apache.drill.exec.rpc.security.SaslProperties;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> {
@@ -72,7 +72,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
   @Override
   protected DataClientConnection initRemoteConnection(SocketChannel channel) {
     super.initRemoteConnection(channel);
-    this.connection = new DataClientConnection(channel, this);
+    this.connection = new DataClientConnection(channel, this, config.getEncryptionCtxt());
     return connection;
   }
 
@@ -107,9 +107,13 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
     if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
       final SaslClient saslClient;
       try {
+
+        final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+                                                                                    connection.getMaxWrappedSize());
+
         saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
             .createSaslClient(UserGroupInformation.getLoginUser(),
-                config.getSaslClientProperties(remoteEndpoint));
+                config.getSaslClientProperties(remoteEndpoint, saslProperties));
       } catch (final IOException e) {
         throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
       }
@@ -126,6 +130,11 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
   }
 
   @Override
+  protected void finalizeConnection(BitServerHandshake handshake, DataClientConnection connection) {
+    // Increment the Data Connection counter.
+    connection.incConnectionCounter();
+  }
+
   protected <M extends MessageLite> RpcCommand<M, DataClientConnection>
   getInitialCommand(final RpcCommand<M, DataClientConnection> command) {
     final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command);

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 625ab25..6ada2f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData.RpcType;
 import org.apache.drill.exec.rpc.AbstractClientConnection;
+import org.apache.drill.exec.rpc.EncryptionContext;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
 import com.google.protobuf.MessageLite;
@@ -37,8 +38,9 @@ public class DataClientConnection extends AbstractClientConnection {
   private final DataClient client;
   private final UUID id;
 
-  public DataClientConnection(SocketChannel channel, DataClient client) {
-    super(channel, "data client");
+  public DataClientConnection(SocketChannel channel, DataClient client,
+                              EncryptionContext encryptionContextImpl) {
+    super(channel, "data client", encryptionContextImpl);
     this.client = client;
     this.id = UUID.randomUUID();
   }
@@ -88,4 +90,14 @@ public class DataClientConnection extends AbstractClientConnection {
   protected Logger getLogger() {
     return logger;
   }
+
+  @Override
+  public void incConnectionCounter() {
+    DataRpcMetrics.getInstance().addConnectionCount();
+  }
+
+  @Override
+  public void decConnectionCounter() {
+    DataRpcMetrics.getInstance().decConnectionCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 25c83b3..27b2250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -44,6 +44,9 @@ public class DataConnectionCreator implements AutoCloseable {
   public DataConnectionCreator(BootStrapContext context, BufferAllocator allocator, WorkEventBus workBus,
                                WorkerBee bee) throws DrillbitStartupException {
     config = new DataConnectionConfig(allocator, context, new DataServerRequestHandler(workBus, bee));
+
+    // Initialize the singleton instance of DataRpcMetrics.
+    ((DataRpcMetrics) DataRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
   }
 
   public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) {
@@ -59,7 +62,7 @@ public class DataConnectionCreator implements AutoCloseable {
   public DataTunnel getTunnel(DrillbitEndpoint endpoint) {
     DataConnectionManager newManager = new DataConnectionManager(endpoint, config);
     DataConnectionManager oldManager = connectionManager.putIfAbsent(endpoint, newManager);
-    if(oldManager != null){
+    if (oldManager != null) {
       newManager = oldManager;
     }
     return new DataTunnel(newManager);

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
new file mode 100644
index 0000000..997df57
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit data rpc layer
+ */
+class DataRpcMetrics extends AbstractRpcMetrics {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcMetrics.class);
+
+  // Total number of data connection's as client and server for a DrillBit.
+  // i.e. Sum of incoming and outgoing data connections.
+  private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "data.encrypted");
+
+  private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "data.unencrypted");
+
+  private static final RpcMetrics INSTANCE = new DataRpcMetrics();
+
+  // prevent instantiation
+  private DataRpcMetrics() {
+  }
+
+  public static RpcMetrics getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Should only be called when first access to getInstance is made. In this case inside {@link DataConnectionCreator}.
+   * {@link DataServerConnection} and {@link DataClientConnection} using the singleton instance should not call
+   * initialize.
+   *
+   * @param useEncryptedCounter
+   * @param allocator
+   */
+  @Override
+  public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+    this.useEncryptedCounter = useEncryptedCounter;
+    registerAllocatorMetrics(allocator);
+  }
+
+  @Override
+  public void addConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.inc();
+    } else {
+      unencryptedConnection.inc();
+    }
+  }
+
+  @Override
+  public void decConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.dec();
+    } else {
+      unencryptedConnection.dec();
+    }
+  }
+
+  private void registerAllocatorMetrics(final BufferAllocator allocator) {
+    registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.data.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 33270fd..9e31d6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,10 +17,10 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import com.google.protobuf.MessageLite;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -31,8 +31,6 @@ import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
 import org.apache.drill.exec.rpc.RpcException;
 
-import com.google.protobuf.MessageLite;
-
 public class DataServer extends BasicServer<RpcType, DataServerConnection> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
 
@@ -84,6 +82,10 @@ public class DataServer extends BasicServer<RpcType, DataServerConnection> {
         if (config.getAuthMechanismToUse() != null) {
           builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
         }
+
+        // Increase the Data Connection counter on server side.
+        connection.incConnectionCounter();
+
         return builder.build();
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
index 70e262f..41a4b1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
@@ -39,4 +39,13 @@ public class DataServerConnection extends AbstractServerConnection<DataServerCon
     return logger;
   }
 
+  @Override
+  public void incConnectionCounter() {
+    DataRpcMetrics.getInstance().addConnectionCount();
+  }
+
+  @Override
+  public void decConnectionCounter() {
+    DataRpcMetrics.getInstance().decConnectionCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
index 9c74ddc..7f51142 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import java.io.IOException;
@@ -55,7 +56,8 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
   private static final org.slf4j.Logger logger =
       org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
 
-  private static final ImmutableMap<SaslStatus, SaslChallengeProcessor> CHALLENGE_PROCESSORS;
+  private static final ImmutableMap<SaslStatus, SaslChallengeProcessor>
+      CHALLENGE_PROCESSORS;
   static {
     final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
     map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
@@ -99,7 +101,7 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
               .setData(responseData)
               .build(),
           SaslMessage.class,
-          true /** the connection will not be backed up at this point */);
+          true /* the connection will not be backed up at this point */);
       logger.trace("Initiated SASL exchange.");
     } catch (final Exception e) {
       completionListener.failed(RpcException.mapException(e));
@@ -120,19 +122,24 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
           new SaslException("Server sent a corrupt message.")));
     } else {
       try {
-        final SaslChallengeContext context = new SaslChallengeContext(value, connection.getSaslClient(), ugi);
-
+        final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection);
         final SaslMessage saslResponse = processor.process(context);
 
         if (saslResponse != null) {
           client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
               connection, saslRpcType, saslResponse, SaslMessage.class,
-              true /** the connection will not be backed up at this point */);
+              true /* the connection will not be backed up at this point */);
         } else {
           // success
           completionListener.success(null, null);
+          if (logger.isTraceEnabled()) {
+            logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}",
+                connection.getSaslClient().getMechanismName(), connection.getEncryptionCtxtString());
+          }
         }
       } catch (final Exception e) {
+        logger.error("Authentication with encryption context: {} using mechanism {} failed with {}",
+            connection.getEncryptionCtxtString(), connection.getSaslClient().getMechanismName(), e.getMessage());
         completionListener.failed(RpcException.mapException(e));
       }
     }
@@ -143,16 +150,16 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
     completionListener.interrupted(e);
   }
 
-  private static class SaslChallengeContext {
+  private static class SaslChallengeContext<C extends ClientConnection> {
 
     final SaslMessage challenge;
-    final SaslClient saslClient;
     final UserGroupInformation ugi;
+    final C connection;
 
-    SaslChallengeContext(SaslMessage challenge, SaslClient saslClient, UserGroupInformation ugi) {
+    SaslChallengeContext(SaslMessage challenge, UserGroupInformation ugi, C connection) {
       this.challenge = checkNotNull(challenge);
-      this.saslClient = checkNotNull(saslClient);
       this.ugi = checkNotNull(ugi);
+      this.connection = checkNotNull(connection);
     }
   }
 
@@ -165,22 +172,24 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
      *
      * @param context challenge context
      * @return response
-     * @throws Exception
+     * @throws Exception in case of any failure
      */
-    SaslMessage process(SaslChallengeContext context) throws Exception;
+    <CC extends ClientConnection>
+    SaslMessage process(SaslChallengeContext<CC> context) throws Exception;
 
   }
 
   private static class SaslInProgressProcessor implements SaslChallengeProcessor {
 
     @Override
-    public SaslMessage process(SaslChallengeContext context) throws Exception {
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
       final SaslMessage.Builder response = SaslMessage.newBuilder();
+      final SaslClient saslClient = context.connection.getSaslClient();
 
-      final byte[] responseBytes = evaluateChallenge(context.ugi, context.saslClient,
+      final byte[] responseBytes = evaluateChallenge(context.ugi, saslClient,
           context.challenge.getData().toByteArray());
 
-      final boolean isComplete = context.saslClient.isComplete();
+      final boolean isComplete = saslClient.isComplete();
       logger.trace("Evaluated challenge. Completed? {}.", isComplete);
       response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
       // if isComplete, the client will get one more response from server
@@ -192,20 +201,18 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
   private static class SaslSuccessProcessor implements SaslChallengeProcessor {
 
     @Override
-    public SaslMessage process(SaslChallengeContext context) throws Exception {
-      if (context.saslClient.isComplete()) {
-        logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
-        // setup security layers here..
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+      final SaslClient saslClient = context.connection.getSaslClient();
+
+      if (saslClient.isComplete()) {
+        handleSuccess(context);
         return null;
       } else {
-
         // server completed before client; so try once, fail otherwise
-        evaluateChallenge(context.ugi, context.saslClient,
-            context.challenge.getData().toByteArray()); // discard response
+        evaluateChallenge(context.ugi, saslClient, context.challenge.getData().toByteArray()); // discard response
 
-        if (context.saslClient.isComplete()) {
-          logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
-          // setup security layers here..
+        if (saslClient.isComplete()) {
+          handleSuccess(context);
           return null;
         } else {
           throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
@@ -217,8 +224,9 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
   private static class SaslFailedProcessor implements SaslChallengeProcessor {
 
     @Override
-    public SaslMessage process(SaslChallengeContext context) throws Exception {
-      throw new SaslException("Authentication failed. Incorrect credentials?");
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+      throw new SaslException(String.format("Authentication failed. Incorrect credentials? [Details: %s]",
+          context.connection.getEncryptionCtxtString()));
     }
   }
 
@@ -243,4 +251,48 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
       }
     }
   }
+
+
+  private static <CC extends ClientConnection> void handleSuccess(SaslChallengeContext<CC> context) throws
+      SaslException {
+    final CC connection = context.connection;
+    final SaslClient saslClient = connection.getSaslClient();
+
+    try {
+      // Check if connection was marked for being secure then verify for negotiated QOP value for
+      // correctness.
+      final String negotiatedQOP = saslClient.getNegotiatedProperty(Sasl.QOP).toString();
+      final String expectedQOP = connection.isEncryptionEnabled()
+          ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+          : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+      if (!(negotiatedQOP.equals(expectedQOP))) {
+        throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+            negotiatedQOP, expectedQOP));
+      }
+
+      // Update the rawWrapChunkSize with the negotiated buffer size since we cannot call encode with more than
+      // negotiated size of buffer.
+      if (connection.isEncryptionEnabled()) {
+        final int negotiatedRawSendSize = Integer.parseInt(
+            saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+        if (negotiatedRawSendSize <= 0) {
+          throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+              "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+              negotiatedRawSendSize));
+        }
+        connection.setWrapSizeLimit(negotiatedRawSendSize);
+      }
+    } catch (Exception e) {
+      throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+          e.getMessage()), e);
+    }
+
+    if (connection.isEncryptionEnabled()) {
+      connection.addSecurityHandlers();
+    } else {
+      // Encryption is not required hence we don't need to hold on to saslClient object.
+      connection.disposeSaslClient();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
index f4c60e7..cfb9512 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
@@ -137,5 +137,4 @@ public class AuthenticatorProviderImpl implements AuthenticatorProvider {
     AutoCloseables.close(authFactories.values());
     authFactories.clear();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
index bdcbcf5..5cac208 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
@@ -48,7 +48,7 @@ public class ClientAuthenticatorProvider implements AuthenticatorProvider {
   // Mapping: simple name -> authenticator factory
   private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
 
-  public ClientAuthenticatorProvider() {
+  private ClientAuthenticatorProvider() {
     // factories provided by Drill
     final KerberosFactory kerberosFactory = new KerberosFactory();
     authFactories.put(kerberosFactory.getSimpleName(), kerberosFactory);

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
new file mode 100644
index 0000000..9ed85ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.Sasl;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class SaslProperties {
+
+  /**
+   * All supported Quality of Protection values which can be negotiated
+   */
+  enum QualityOfProtection {
+    AUTHENTICATION("auth"),
+    INTEGRITY("auth-int"),
+    PRIVACY("auth-conf");
+
+    public final String saslQop;
+
+    QualityOfProtection(String saslQop) {
+      this.saslQop = saslQop;
+    }
+
+    public String getSaslQop() {
+      return saslQop;
+    }
+  }
+
+  /**
+   * Get's the map of minimum set of SaslProperties required during negotiation process either for encryption
+   * or authentication
+   * @param encryptionEnabled - Flag to determine if property needed is for encryption or authentication
+   * @param wrappedChunkSize  - Configured wrappedChunkSize to negotiate for.
+   * @return Map of SaslProperties which will be used in negotiation.
+   */
+  public static Map<String, String> getSaslProperties(boolean encryptionEnabled, int wrappedChunkSize) {
+    Map<String, String> saslProps = new HashMap<>();
+
+    if (encryptionEnabled) {
+      saslProps.put(Sasl.STRENGTH, "high");
+      saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+      saslProps.put(Sasl.MAX_BUFFER, Integer.toString(wrappedChunkSize));
+      saslProps.put(Sasl.POLICY_NOPLAINTEXT, "true");
+    } else {
+      saslProps.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.getSaslQop());
+    }
+
+    return saslProps;
+  }
+
+  private SaslProperties() {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
index bf34d57..ddd216f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.ServerConnection;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import java.io.IOException;
@@ -87,7 +88,7 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
       try {
         saslResponse = SaslMessage.PARSER.parseFrom(new ByteBufInputStream(pBody));
       } catch (final InvalidProtocolBufferException e) {
-        handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+        handleAuthFailure(connection, sender, e, saslResponseType);
         return;
       }
 
@@ -95,17 +96,17 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
       final SaslResponseProcessor processor = RESPONSE_PROCESSORS.get(saslResponse.getStatus());
       if (processor == null) {
         logger.info("Unknown message type from client from {}. Will stop authentication.", remoteAddress);
-        handleAuthFailure(remoteAddress, sender, new SaslException("Received unexpected message"),
+        handleAuthFailure(connection, sender, new SaslException("Received unexpected message"),
             saslResponseType);
         return;
       }
 
-      final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, remoteAddress,
-          sender, requestHandler, saslResponseType);
+      final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, sender,
+          requestHandler, saslResponseType);
       try {
         processor.process(context);
       } catch (final Exception e) {
-        handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+        handleAuthFailure(connection, sender, e, saslResponseType);
       }
     } else {
 
@@ -115,9 +116,9 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
       // but the client should not be making any requests before authenticating.
       // drop connection
       throw new RpcException(
-          String.format("Request of type %d is not allowed without authentication. " +
-                  "Client on %s must authenticate before making requests. Connection dropped.",
-              rpcType, remoteAddress));
+          String.format("Request of type %d is not allowed without authentication. Client on %s must authenticate " +
+              "before making requests. Connection dropped. [Details: %s]",
+              rpcType, remoteAddress, connection.getEncryptionCtxtString()));
     }
   }
 
@@ -125,16 +126,14 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
 
     final SaslMessage saslResponse;
     final S connection;
-    final String remoteAddress;
     final ResponseSender sender;
     final RequestHandler<S> requestHandler;
     final T saslResponseType;
 
-    SaslResponseContext(SaslMessage saslResponse, S connection, String remoteAddress, ResponseSender sender,
+    SaslResponseContext(SaslMessage saslResponse, S connection, ResponseSender sender,
                         RequestHandler<S> requestHandler, T saslResponseType) {
       this.saslResponse = checkNotNull(saslResponse);
       this.connection = checkNotNull(connection);
-      this.remoteAddress = checkNotNull(remoteAddress);
       this.sender = checkNotNull(sender);
       this.requestHandler = checkNotNull(requestHandler);
       this.saslResponseType = checkNotNull(saslResponseType);
@@ -208,8 +207,11 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
 
         handleSuccess(context, challenge, saslServer);
       } else {
-        logger.info("Failed to authenticate client from {}", context.remoteAddress);
-        throw new SaslException("Client allegedly succeeded authentication, but server did not. Suspicious?");
+        final S connection = context.connection;
+        logger.info("Failed to authenticate client from {} with encryption context:{}",
+            connection.getRemoteAddress().toString(), connection.getEncryptionCtxtString());
+        throw new SaslException(String.format("Client allegedly succeeded authentication but server did not. " +
+            "Suspicious? [Details: %s]", connection.getEncryptionCtxtString()));
       }
     }
   }
@@ -219,9 +221,11 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
     @Override
     public <S extends ServerConnection<S>, T extends EnumLite>
     void process(SaslResponseContext<S, T> context) throws Exception {
-      logger.info("Client from {} failed authentication graciously, and does not want to continue.",
-          context.remoteAddress);
-      throw new SaslException("Client graciously failed authentication");
+      final S connection = context.connection;
+      logger.info("Client from {} failed authentication with encryption context:{} graciously, and does not want to " +
+          "continue.", connection.getRemoteAddress().toString(), connection.getEncryptionCtxtString());
+      throw new SaslException(String.format("Client graciously failed authentication. [Details: %s]",
+          connection.getEncryptionCtxtString()));
     }
   }
 
@@ -251,25 +255,67 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
   private static <S extends ServerConnection<S>, T extends EnumLite>
   void handleSuccess(final SaslResponseContext<S, T> context, final SaslMessage.Builder challenge,
                      final SaslServer saslServer) throws IOException {
-    context.connection.changeHandlerTo(context.requestHandler);
-    context.connection.finalizeSaslSession();
-    context.sender.send(new Response(context.saslResponseType, challenge.build()));
 
-    // setup security layers here..
+    final S connection = context.connection;
+    connection.changeHandlerTo(context.requestHandler);
+    connection.finalizeSaslSession();
+
+    // Check the negotiated property before sending the response back to client
+    try {
+      final String negotiatedQOP = saslServer.getNegotiatedProperty(Sasl.QOP).toString();
+      final String expectedQOP = (connection.isEncryptionEnabled())
+          ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+          : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+      if (!(negotiatedQOP.equals(expectedQOP))) {
+        throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+            negotiatedQOP, expectedQOP));
+      }
+
+      // Update the rawWrapSendSize with the negotiated rawSendSize since we cannot call encode with more than the
+      // negotiated size of buffer
+      if (connection.isEncryptionEnabled()) {
+        final int negotiatedRawSendSize = Integer.parseInt(
+            saslServer.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+        if (negotiatedRawSendSize <= 0) {
+          throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+              "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+              negotiatedRawSendSize));
+        }
+        connection.setWrapSizeLimit(negotiatedRawSendSize);
+      }
+    } catch (IllegalStateException | NumberFormatException e) {
+      throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+          e.getMessage()), e);
+    }
 
     if (logger.isTraceEnabled()) {
-      logger.trace("Authenticated {} successfully using {} from {}", saslServer.getAuthorizationID(),
-          saslServer.getMechanismName(), context.remoteAddress);
+      logger.trace("Authenticated {} successfully using {} from {} with encryption context {}",
+          saslServer.getAuthorizationID(), saslServer.getMechanismName(), connection.getRemoteAddress().toString(),
+          connection.getEncryptionCtxtString());
+    }
+
+    // All checks have passed let's send the response back to client before adding handlers.
+    context.sender.send(new Response(context.saslResponseType, challenge.build()));
+
+    if (connection.isEncryptionEnabled()) {
+      connection.addSecurityHandlers();
+    } else {
+      // Encryption is not required hence we don't need to hold on to saslServer object.
+      connection.disposeSaslServer();
     }
   }
 
   private static final SaslMessage SASL_FAILED_MESSAGE =
       SaslMessage.newBuilder().setStatus(SaslStatus.SASL_FAILED).build();
 
-  private static <T extends EnumLite>
-  void handleAuthFailure(final String remoteAddress, final ResponseSender sender,
+  private static <S extends ServerConnection<S>, T extends EnumLite>
+  void handleAuthFailure(final S connection, final ResponseSender sender,
                          final Exception e, final T saslResponseType) throws RpcException {
-    logger.debug("Authentication failed from client {} due to {}", remoteAddress, e);
+    final String remoteAddress = connection.getRemoteAddress().toString();
+
+    logger.debug("Authentication using mechanism {} with encryption context {} failed from client {} due to {}",
+        connection.getSaslServer().getMechanismName(), connection.getEncryptionCtxtString(), remoteAddress, e);
 
     // inform the client that authentication failed, and no more
     sender.send(new Response(saslResponseType, SASL_FAILED_MESSAGE));

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
index 855dd8b..e14d411 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -33,6 +33,7 @@ import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.LoginException;
 import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
@@ -93,6 +94,7 @@ public class KerberosFactory implements AuthenticatorFactory {
   @Override
   public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
       throws SaslException {
+    final String qopValue = properties.containsKey(Sasl.QOP) ? properties.get(Sasl.QOP).toString() : "auth";
     try {
       final String primaryName = ugi.getShortUserName();
       final String instanceName = new HadoopKerberosName(ugi.getUserName()).getHostName();
@@ -105,7 +107,7 @@ public class KerberosFactory implements AuthenticatorFactory {
                   new KerberosServerCallbackHandler());
         }
       });
-      logger.trace("GSSAPI SaslServer created.");
+      logger.trace("GSSAPI SaslServer created with QOP {}.", qopValue);
       return saslServer;
     } catch (final UndeclaredThrowableException e) {
       final Throwable cause = e.getCause();
@@ -113,11 +115,13 @@ public class KerberosFactory implements AuthenticatorFactory {
       if (cause instanceof SaslException) {
         throw (SaslException) cause;
       } else {
-        throw new SaslException("Unexpected failure trying to authenticate using Kerberos", cause);
+        throw new SaslException(String.format("Unexpected failure trying to authenticate using Kerberos with QOP %s",
+            qopValue), cause);
       }
     } catch (final IOException | InterruptedException e) {
       logger.debug("Authentication failed.", e);
-      throw new SaslException("Unexpected failure trying to authenticate using Kerberos", e);
+      throw new SaslException(String.format("Unexpected failure trying to authenticate using Kerberos with QOP %s",
+          qopValue), e);
     }
   }
 
@@ -129,6 +133,8 @@ public class KerberosFactory implements AuthenticatorFactory {
     final String parts[] = KerberosUtil.splitPrincipalIntoParts(servicePrincipal);
     final String serviceName = parts[0];
     final String serviceHostName = parts[1];
+    final String qopValue = properties.containsKey(Sasl.QOP) ? properties.get(Sasl.QOP).toString() : "auth";
+
     // ignore parts[2]; GSSAPI gets the realm info from the ticket
     try {
       final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
@@ -146,20 +152,20 @@ public class KerberosFactory implements AuthenticatorFactory {
               });
         }
       });
-      logger.debug("GSSAPI SaslClient created to authenticate to {} running on {}",
-          serviceName, serviceHostName);
+      logger.debug("GSSAPI SaslClient created to authenticate to {} running on {} with QOP value {}",
+          serviceName, serviceHostName, qopValue);
       return saslClient;
     } catch (final UndeclaredThrowableException e) {
       logger.debug("Authentication failed.", e);
-      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
-          serviceHostName), e.getCause());
+      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI with QOP %s",
+          serviceHostName, qopValue), e.getCause());
     } catch (final IOException | InterruptedException e) {
       logger.debug("Authentication failed.", e);
       if (e instanceof SaslException) {
         throw (SaslException) e;
       }
-      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
-          serviceHostName), e);
+      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI with QOP %s",
+          serviceHostName, qopValue), e);
     }
   }