You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:44 UTC

[3/6] drill git commit: DRILL-4335: Apache Drill should support network encryption.

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a7ea7b7..2f47538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -69,6 +69,7 @@ import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
 import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
 import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider;
 import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.rpc.security.SaslProperties;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 
@@ -81,6 +82,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.MessageLite;
 
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -137,18 +139,32 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
    */
   public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties,
                       final UserCredentials credentials) throws RpcException {
-    final UserToBitHandshake handshake = UserToBitHandshake.newBuilder()
+    final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
         .setSupportComplexTypes(supportComplexTypes)
         .setSupportTimeout(true)
         .setCredentials(credentials)
         .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
-        .setSaslSupport(SaslSupport.SASL_AUTH)
-        .setProperties(properties.serializeForServer())
-        .build();
+        .setSaslSupport(SaslSupport.SASL_PRIVACY)
+        .setProperties(properties.serializeForServer());
+
+    // Only used for testing purpose
+    if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) {
+      hsBuilder.setSaslSupport(SaslSupport.valueOf(
+          Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
+    }
+
+    connect(hsBuilder.build(), endpoint).checkedGet();
 
-    connect(handshake, endpoint).checkedGet();
+    // Check if client needs encryption and server is not configured for encryption.
+    final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT)
+        && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
+
+    if(clientNeedsEncryption && !connection.isEncryptionEnabled()) {
+      throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " +
+          "encryption. Please check connection parameter or contact your administrator");
+    }
 
     if (serverAuthMechanisms != null) {
       try {
@@ -192,6 +208,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
   private CheckedFuture<Void, SaslException> authenticate(final DrillProperties properties) {
     final Map<String, String> propertiesMap = properties.stringPropertiesAsMap();
 
+    // Set correct QOP property and Strength based on server needs encryption or not.
+    // If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize,
+    // If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size.
+    propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+                                                          connection.getMaxWrappedSize()));
+
     final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException
     final CheckedFuture<Void, SaslException> authFuture =
         new AbstractCheckedFuture<Void, SaslException>(authSettable) {
@@ -201,10 +223,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
             if (e instanceof ExecutionException) {
               final Throwable cause = Throwables.getRootCause(e);
               if (cause instanceof SaslException) {
-                return new SaslException("Authentication failed: " + cause.getMessage(), cause);
+                return new SaslException(String.format("Authentication failed. [Details: %s, Error %s]",
+                    connection.getEncryptionCtxtString(), cause.getMessage()), cause);
               }
             }
-            return new SaslException("Authentication failed unexpectedly.", e);
+            return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]",
+                connection.getEncryptionCtxtString(), e.getMessage()), e);
           }
         };
 
@@ -215,11 +239,13 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
     try {
       factory = getAuthenticatorFactory(properties);
       mechanismName = factory.getSimpleName();
-      logger.trace("Will try to authenticate to server using {} mechanism.", mechanismName);
+      logger.trace("Will try to authenticate to server using {} mechanism with encryption context {}",
+          mechanismName, connection.getEncryptionCtxtString());
       ugi = factory.createAndLoginUser(propertiesMap);
       saslClient = factory.createSaslClient(ugi, propertiesMap);
       if (saslClient == null) {
-        throw new SaslException("Cannot initiate authentication. Insufficient credentials?");
+        throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " +
+            "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName()));
       }
       connection.setSaslClient(saslClient);
     } catch (final IOException e) {
@@ -255,13 +281,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
     // first, check if a certain mechanism must be used
     String authMechanism = properties.getProperty(DrillProperties.AUTH_MECHANISM);
     if (authMechanism != null) {
-      if (!ClientAuthenticatorProvider.getInstance()
-          .containsFactory(authMechanism)) {
+      if (!ClientAuthenticatorProvider.getInstance().containsFactory(authMechanism)) {
         throw new SaslException(String.format("Unknown mechanism: %s", authMechanism));
       }
       if (!mechanismSet.contains(authMechanism.toUpperCase())) {
-        throw new SaslException(String.format("Server does not support authentication using: %s",
-            authMechanism));
+        throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]",
+            authMechanism, connection.getEncryptionCtxtString()));
       }
       return ClientAuthenticatorProvider.getInstance()
           .getAuthenticatorFactory(authMechanism);
@@ -282,8 +307,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
           .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
     }
 
-    throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?",
-        serverAuthMechanisms));
+    throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " +
+        "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString()));
   }
 
   protected <SEND extends MessageLite, RECEIVE extends MessageLite>
@@ -331,8 +356,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
     if (!authComplete) {
       // Remote should not be making any requests before authenticating, drop connection
       throw new RpcException(String.format("Request of type %d is not allowed without authentication. " +
-                  "Remote on %s must authenticate before making requests. Connection dropped.",
-              rpcType, connection.getRemoteAddress()));
+          "Remote on %s must authenticate before making requests. Connection dropped.",
+          rpcType, connection.getRemoteAddress()));
     }
     switch (rpcType) {
     case RpcType.QUERY_DATA_VALUE:
@@ -361,8 +386,14 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
       break;
     case AUTH_REQUIRED: {
       authComplete = false;
-      logger.trace("Server requires authentication before proceeding.");
       serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
+      connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
+
+      if (inbound.hasMaxWrappedSize()) {
+        connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+      }
+      logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.",
+          connection.getEncryptionCtxtString()));
       break;
     }
     case AUTH_FAILED:
@@ -384,6 +415,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
   public class UserToBitConnection extends AbstractClientConnection {
 
     UserToBitConnection(SocketChannel channel) {
+
+      // by default connection is not set for encryption. After receiving handshake msg from server we set the
+      // isEncryptionEnabled, useChunkMode and chunkModeSize correctly.
       super(channel, "user client");
     }
 
@@ -396,6 +430,16 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
     protected Logger getLogger() {
       return logger;
     }
+
+    @Override
+    public void incConnectionCounter() {
+      // no-op
+    }
+
+    @Override
+    public void decConnectionCounter() {
+      // no-op
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 49a866b..64ac6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.rpc.user;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.AbstractConnectionConfig;
 import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
 import org.apache.drill.exec.server.BootStrapContext;
 
 // config for bit to user connection
@@ -35,24 +38,46 @@ class UserConnectionConfig extends AbstractConnectionConfig {
   private final UserServerRequestHandler handler;
 
   UserConnectionConfig(BufferAllocator allocator, BootStrapContext context, UserServerRequestHandler handler)
-      throws DrillbitStartupException {
+    throws DrillbitStartupException {
     super(allocator, context);
     this.handler = handler;
 
-    if (context.getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
-      if (getAuthProvider().getAllFactoryNames().isEmpty()) {
+    final DrillConfig config = context.getConfig();
+    final AuthenticatorProvider authProvider = getAuthProvider();
+
+    if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
+      if (authProvider.getAllFactoryNames().isEmpty()) {
         throw new DrillbitStartupException("Authentication enabled, but no mechanisms found. Please check " +
             "authentication configuration.");
       }
       authEnabled = true;
-      logger.info("Configured all user connections to require authentication using: {}",
-          getAuthProvider().getAllFactoryNames());
+
+      // Update encryption related parameters.
+      encryptionContext.setEncryption(config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED));
+      final int maxWrappedSize = config.getInt(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE);
+
+      if (maxWrappedSize <= 0) {
+        throw new DrillbitStartupException(String.format("Invalid value configured for " +
+            "user.encryption.sasl.max_wrapped_size. Must be a positive integer in bytes with a recommended max value " +
+            "of %s", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE));
+      } else if (maxWrappedSize > RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE) {
+        logger.warn("The configured value of user.encryption.sasl.max_wrapped_size is too big. This may cause higher" +
+            " memory pressure. [Details: Recommended max value is %s]", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
+      }
+      encryptionContext.setMaxWrappedSize(maxWrappedSize);
+
+      logger.info("Configured all user connections to require authentication with encryption: {} using: {}",
+          encryptionContext.getEncryptionCtxtString(), authProvider.getAllFactoryNames());
+    } else if (config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED)) {
+      throw new DrillbitStartupException("Invalid security configuration. Encryption using SASL is enabled with " +
+          "authentication disabled. Please check the security.user configurations.");
     } else {
       authEnabled = false;
     }
 
-    impersonationManager = !context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED) ? null :
-        new InboundImpersonationManager();
+    impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
+        ? null
+        : new InboundImpersonationManager();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
new file mode 100644
index 0000000..ab93e3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit user rpc layer
+ */
+class UserRpcMetrics extends AbstractRpcMetrics {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcMetrics.class);
+
+  // Total number of user client connection's to a DrillBit.
+  private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "user.encrypted");
+
+  private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+      .counter(CONNECTION_COUNTER_PREFIX + "user.unencrypted");
+
+  private static final RpcMetrics INSTANCE = new UserRpcMetrics();
+
+  // prevent instantiation
+  private UserRpcMetrics() {
+  }
+
+  public static RpcMetrics getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Should only be called when first access to getInstance is made. In this case inside {@link UserServer}.
+   * BitToUserConnection using the singleton instance should not call initialize.
+   *
+   * @param useEncryptedCounter
+   * @param allocator
+   */
+  @Override
+  public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+    this.useEncryptedCounter = useEncryptedCounter;
+    registerAllocatorMetrics(allocator);
+  }
+
+
+  @Override
+  public void addConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.inc();
+    } else {
+      unencryptedConnection.inc();
+    }
+  }
+
+  @Override
+  public void decConnectionCount() {
+    if (useEncryptedCounter) {
+      encryptedConnections.dec();
+    } else {
+      unencryptedConnection.dec();
+    }
+  }
+
+  private void registerAllocatorMetrics(final BufferAllocator allocator) {
+    registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.user.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9f0d502..543145f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.OutOfMemoryHandler;
 import org.apache.drill.exec.rpc.OutboundRpcMessage;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
+import org.apache.drill.exec.rpc.RpcConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
@@ -78,6 +79,9 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
         eventLoopGroup);
     this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
     this.userWorker = worker;
+
+    // Initialize Singleton instance of UserRpcMetrics.
+    ((UserRpcMetrics)UserRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
   }
 
   @Override
@@ -149,7 +153,7 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
     }
 
     void disableReadTimeout() {
-      getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
+      getChannel().pipeline().remove(RpcConstants.TIMEOUT_HANDLER);
     }
 
     void setHandshake(final UserToBitHandshake inbound) {
@@ -186,6 +190,10 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
       if (config.getImpersonationManager() != null && targetName != null) {
         config.getImpersonationManager().replaceUserOnSession(targetName, session);
       }
+
+      // Increase the corresponding connection counter.
+      // For older clients we call this method directly.
+      incConnectionCounter();
     }
 
     @Override
@@ -237,6 +245,16 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
       cleanup();
       super.close();
     }
+
+    @Override
+    public void incConnectionCounter() {
+      UserRpcMetrics.getInstance().addConnectionCount();
+    }
+
+    @Override
+    public void decConnectionCounter() {
+      UserRpcMetrics.getInstance().decConnectionCount();
+    }
   }
 
   @Override
@@ -295,7 +313,16 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
           }
 
           final boolean clientSupportsSasl = inbound.hasSaslSupport() &&
-              (inbound.getSaslSupport().ordinal() >= SaslSupport.SASL_AUTH.ordinal());
+              (inbound.getSaslSupport().ordinal() > SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal());
+
+          final int saslSupportOrdinal = (clientSupportsSasl) ? inbound.getSaslSupport().ordinal()
+                                                              : SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal();
+
+          if (saslSupportOrdinal <= SaslSupport.SASL_AUTH.ordinal() && config.isEncryptionEnabled()) {
+            throw new UserAuthenticationException("The server doesn't allow client without encryption support." +
+                " Please upgrade your client or talk to your system administrator.");
+          }
+
           if (!clientSupportsSasl) { // for backward compatibility < 1.10
             final String userName = inbound.getCredentials().getUserName();
             if (logger.isTraceEnabled()) {
@@ -335,9 +362,14 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
             }
           }
 
-          // mention server's authentication capabilities
+          // Offer all the configured mechanisms to client. If certain mechanism doesn't support encryption
+          // like PLAIN, those should fail during the SASL handshake negotiation.
           respBuilder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
 
+          // set the encrypted flag in handshake message. For older clients this field is optional so will be ignored
+          respBuilder.setEncrypted(connection.isEncryptionEnabled());
+          respBuilder.setMaxWrappedSize(connection.getMaxWrappedSize());
+
           // for now, this means PLAIN credentials will be sent over twice
           // (during handshake and during sasl exchange)
           respBuilder.setStatus(HandshakeStatus.AUTH_REQUIRED);

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
index c7a1338..a79c1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
@@ -46,6 +46,12 @@ public class UserAuthenticatorFactory {
    */
   public static UserAuthenticator createAuthenticator(final DrillConfig config, ScanResult scan)
       throws DrillbitStartupException {
+
+    if(!config.hasPath(USER_AUTHENTICATOR_IMPL)) {
+      throw new DrillbitStartupException(String.format("BOOT option '%s' is missing in config.",
+          USER_AUTHENTICATOR_IMPL));
+    }
+
     final String authImplConfigured = config.getString(USER_AUTHENTICATOR_IMPL);
 
     if (Strings.isNullOrEmpty(authImplConfigured)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index ba0f212..84c471e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -29,6 +29,8 @@ import javax.xml.bind.annotation.XmlRootElement;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
 import org.apache.drill.exec.work.WorkManager;
@@ -61,6 +63,10 @@ public class DrillRoot {
     final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint();
     final String currentVersion = currentDrillbit.getVersion();
 
+    final DrillConfig config = work.getContext().getConfig();
+    final boolean userEncryptionEnabled = config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED);
+    final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
+
     for (DrillbitEndpoint endpoint : work.getContext().getBits()) {
       final DrillbitInfo drillbit = new DrillbitInfo(endpoint,
               currentDrillbit.equals(endpoint),
@@ -71,7 +77,8 @@ public class DrillRoot {
       drillbits.add(drillbit);
     }
 
-    return new ClusterInfo(drillbits, currentVersion, mismatchedVersions);
+    return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
+      userEncryptionEnabled, bitEncryptionEnabled);
   }
 
   @XmlRootElement
@@ -79,14 +86,20 @@ public class DrillRoot {
     private final Collection<DrillbitInfo> drillbits;
     private final String currentVersion;
     private final Collection<String> mismatchedVersions;
+    private final boolean userEncryptionEnabled;
+    private final boolean bitEncryptionEnabled;
 
     @JsonCreator
     public ClusterInfo(Collection<DrillbitInfo> drillbits,
                        String currentVersion,
-                       Collection<String> mismatchedVersions) {
+                       Collection<String> mismatchedVersions,
+                       boolean userEncryption,
+                       boolean bitEncryption) {
       this.drillbits = Sets.newTreeSet(drillbits);
       this.currentVersion = currentVersion;
       this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
+      this.userEncryptionEnabled = userEncryption;
+      this.bitEncryptionEnabled = bitEncryption;
     }
 
     public Collection<DrillbitInfo> getDrillbits() {
@@ -100,6 +113,10 @@ public class DrillRoot {
     public Collection<String> getMismatchedVersions() {
       return Sets.newTreeSet(mismatchedVersions);
     }
+
+    public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
+
+    public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
   }
 
   public static class DrillbitInfo implements Comparable<DrillbitInfo> {

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 9fa3c27..07c54ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.service;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import io.netty.buffer.PooledByteBufAllocatorL;
 import io.netty.channel.EventLoopGroup;
 
@@ -29,12 +30,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.rpc.TransportCheck;
 import org.apache.drill.exec.rpc.control.Controller;
@@ -44,8 +43,6 @@ import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.WorkManager;
 
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Stopwatch;
 
 public class ServiceEngine implements AutoCloseable {
@@ -84,53 +81,8 @@ public class ServiceEngine implements AutoCloseable {
     intialUserPort = context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT);
     this.allowPortHunting = allowPortHunting;
     this.isDistributedMode = isDistributedMode;
-
-    registerMetrics(context.getMetrics());
   }
 
-  private void registerMetrics(final MetricRegistry registry) {
-    final String prefix = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
-    DrillMetrics.register(prefix + "user.used", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return userAllocator.getAllocatedMemory();
-      }
-    });
-    DrillMetrics.register(prefix + "user.peak", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return userAllocator.getPeakMemoryAllocation();
-      }
-    });
-    DrillMetrics.register(prefix + "bit.control.used", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return controlAllocator.getAllocatedMemory();
-      }
-    });
-    DrillMetrics.register(prefix + "bit.control.peak", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return controlAllocator.getPeakMemoryAllocation();
-      }
-    });
-
-    DrillMetrics.register(prefix + "bit.data.used", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return dataAllocator.getAllocatedMemory();
-      }
-    });
-    DrillMetrics.register(prefix + "bit.data.peak", new Gauge<Long>() {
-      @Override
-      public Long getValue() {
-        return dataAllocator.getPeakMemoryAllocation();
-      }
-    });
-
-  }
-
-
   private static BufferAllocator newAllocator(
       BootStrapContext context, String name, String initReservation, String maxAllocation) {
     return context.getAllocator().newChildAllocator(

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 3d66d19..19e1b1f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -152,6 +152,14 @@ drill.exec: {
     enabled: false
     use_login_principal: false
   }
+  security.user.encryption.sasl {
+    enabled : false,
+    max_wrapped_size : 65536
+  }
+  security.bit.encryption.sasl {
+    enabled : false,
+    max_wrapped_size : 65536
+  }
   trace: {
     directory: "/tmp/drill-trace",
     filesystem: "file:///"

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/resources/rest/index.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl
index 3175479..d1aa844 100644
--- a/exec/java-exec/src/main/resources/rest/index.ftl
+++ b/exec/java-exec/src/main/resources/rest/index.ftl
@@ -69,6 +69,27 @@
           </tbody>
         </table>
       </div>
+    </div>
+  </div>
+
+  <div class="row">
+      <div class="col-md-12">
+        <h3>Encryption Info <span class="label label-primary"></span></h3>
+        <div class="table-responsive">
+          <table class="table table-hover">
+            <tbody>
+                <tr>
+                  <td>Client to Bit Encryption:</td>
+                  <td>${model.isUserEncryptionEnabled()?string("enabled", "disabled")}</td>
+                </tr>
+                <tr>
+                  <td>Bit to Bit Encryption:</td>
+                  <td>${model.isBitEncryptionEnabled()?string("enabled", "disabled")}</td>
+                </tr>
+            </tbody>
+          </table>
+        </div>
+      </div>
   </div>
 </#macro>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index c3cc2da..a3ea198 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -48,88 +49,45 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.hadoop.security.UgiTestUtil;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.apache.kerby.kerberos.kerb.KrbException;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.nio.file.Files;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertTrue;
 
 @Ignore("See DRILL-5387")
 public class TestBitBitKerberos extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitBitKerberos.class);
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitBitKerberos.class);
 
+  private static KerberosHelper krbHelper;
   private static DrillConfig newConfig;
 
-  private static File workspace;
+  private static BootStrapContext c1;
+  private static FragmentManager manager;
+  private int port = 1234;
 
-  private static File kdcDir;
-  private static SimpleKdcServer kdc;
-  private static int kdcPort;
-
-  private static final String HOSTNAME = "localhost";
-  private static final String REALM = "EXAMPLE.COM";
-
-  private static final String SERVER_SHORT_NAME = System.getProperty("user.name");
-  private static final String SERVER_PRINCIPAL = SERVER_SHORT_NAME + "/" + HOSTNAME + "@" + REALM;
-
-  private static File keytabDir;
-  private static File serverKeytab;
-
-  private static boolean kdcStarted;
-
-  @SuppressWarnings("restriction")
   @BeforeClass
-  public static void setupKdc() throws Exception {
-    kdc = new SimpleKdcServer();
-    workspace = new File(getTempDir("kerberos_target"));
-
-    kdcDir = new File(workspace, TestBitBitKerberos.class.getSimpleName());
-    kdcDir.mkdirs();
-    kdc.setWorkDir(kdcDir);
-
-    kdc.setKdcHost(HOSTNAME);
-    kdcPort = getFreePort();
-    kdc.setAllowTcp(true);
-    kdc.setAllowUdp(false);
-    kdc.setKdcTcpPort(kdcPort);
-
-    logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
-
-    kdc.init();
-    kdc.start();
-    kdcStarted = true;
+  public static void setupTest() throws Exception {
 
     final Config config = DrillConfig.create(cloneDefaultTestConfigProperties());
-    keytabDir = new File(workspace, TestBitBitKerberos.class.getSimpleName() + "_keytabs");
-    keytabDir.mkdirs();
-    setupUsers(keytabDir);
-
-    // Kerby sets "java.security.krb5.conf" for us!
-    System.clearProperty("java.security.auth.login.config");
-    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
-    // Uncomment the following lines for debugging.
-    // System.setProperty("sun.security.spnego.debug", "true");
-    // System.setProperty("sun.security.krb5.debug", "true");
+    krbHelper = new KerberosHelper(TestBitBitKerberos.class.getSimpleName());
+    krbHelper.setupKdc();
 
     newConfig = new DrillConfig(
         config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
@@ -141,9 +99,9 @@ public class TestBitBitKerberos extends BaseTestQuery {
         .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
             ConfigValueFactory.fromAnyRef(true))
         .withValue(BootStrapContext.SERVICE_PRINCIPAL,
-            ConfigValueFactory.fromAnyRef(SERVER_PRINCIPAL))
+            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
         .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
-            ConfigValueFactory.fromAnyRef(serverKeytab.toString())),
+            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
         false);
 
     // Ignore the compile time warning caused by the code below.
@@ -159,65 +117,13 @@ public class TestBitBitKerberos extends BaseTestQuery {
     defaultRealm.set(null, KerberosUtil.getDefaultRealm());
 
     updateTestCluster(1, newConfig);
-  }
-
-  private static int getFreePort() throws IOException {
-    @SuppressWarnings("resource")
-    ServerSocket s = null;
-    try {
-      s = new ServerSocket(0);
-      s.setReuseAddress(true);
-      return s.getLocalPort();
-    } finally {
-      if (s != null) {
-        s.close();
-      }
-    }
-  }
-
-  private static void setupUsers(File keytabDir) throws KrbException {
-    // Create the server user
-    String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
-    serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
-    logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
-    setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
-  }
-
-  private static void setupUser(SimpleKdcServer kdc, File keytab, String principal)
-      throws KrbException {
-    kdc.createPrincipal(principal);
-    kdc.exportPrincipal(principal, keytab);
-  }
-
-  @AfterClass
-  public static void stopKdc() throws Exception {
-    if (kdcStarted) {
-      logger.info("Stopping KDC on {}", kdcPort);
-      kdc.stop();
-    }
-
-    deleteIfExists(serverKeytab);
-    deleteIfExists(keytabDir);
-    deleteIfExists(kdcDir);
-    deleteIfExists(workspace);
-    UgiTestUtil.resetUgi();
-  }
-
-  private static void deleteIfExists(File file) throws IOException {
-    if (file != null) {
-      Files.deleteIfExists(file.toPath());
-    }
-  }
-
-  @Test
-  public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
 
     ScanResult result = ClassPathScanner.fromPrescan(newConfig);
-    @SuppressWarnings("resource")
-    final BootStrapContext c1 = new BootStrapContext(newConfig, result);
-    @SuppressWarnings({ "unused", "resource" })
-    final BootStrapContext c2 = new BootStrapContext(newConfig, result);
+    c1 = new BootStrapContext(newConfig, result);
+    setupFragmentContextAndManager();
+  }
 
+  private static void setupFragmentContextAndManager() {
     final FragmentContext fcontext = new MockUp<FragmentContext>(){
       @SuppressWarnings("unused")
       BufferAllocator getAllocator(){
@@ -225,7 +131,7 @@ public class TestBitBitKerberos extends BaseTestQuery {
       }
     }.getMockInstance();
 
-    final FragmentManager manager = new MockUp<FragmentManager>(){
+    manager = new MockUp<FragmentManager>(){
       int v = 0;
 
       @Mock
@@ -252,36 +158,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
       }
 
     }.getMockInstance();
-
-
-    new NonStrictExpectations() {{
-      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
-      workBus.getFragmentManager( (FragmentHandle) any); result = manager;
-    }};
-
-    int port = 1234;
-
-    DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
-        new DataServerRequestHandler(workBus, bee));
-    @SuppressWarnings("resource")
-    DataServer server = new DataServer(config);
-
-    port = server.bind(port, true);
-    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
-    @SuppressWarnings("resource")
-    DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
-    DataTunnel tunnel = new DataTunnel(connectionManager);
-    AtomicLong max = new AtomicLong(0);
-    for (int i = 0; i < 40; i++) {
-      long t1 = System.currentTimeMillis();
-      tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
-          1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
-      System.out.println(System.currentTimeMillis() - t1);
-      // System.out.println("sent.");
-    }
-    System.out.println(String.format("Max time: %d", max.get()));
-    assertTrue(max.get() > 2700);
-    Thread.sleep(5000);
   }
 
   private static WritableBatch getRandomBatch(BufferAllocator allocator, int records) {
@@ -289,8 +165,8 @@ public class TestBitBitKerberos extends BaseTestQuery {
     for (int i = 0; i < 5; i++) {
       @SuppressWarnings("resource")
       Float8Vector v = (Float8Vector) TypeHelper.getNewVector(
-          MaterializedField.create("a", Types.required(MinorType.FLOAT8)),
-          allocator);
+        MaterializedField.create("a", Types.required(MinorType.FLOAT8)),
+        allocator);
       v.allocateNew(records);
       v.getMutator().generateTestData(records);
       vectors.add(v);
@@ -302,7 +178,7 @@ public class TestBitBitKerberos extends BaseTestQuery {
     private AtomicLong max;
     private Stopwatch watch = Stopwatch.createStarted();
 
-    public TimingOutcome(AtomicLong max) {
+    TimingOutcome(AtomicLong max) {
       super();
       this.max = max;
     }
@@ -315,7 +191,8 @@ public class TestBitBitKerberos extends BaseTestQuery {
     @Override
     public void success(Ack value, ByteBuf buffer) {
       long micros = watch.elapsed(TimeUnit.MILLISECONDS);
-      System.out.println(String.format("Total time to send: %d, start time %d", micros, System.currentTimeMillis() - micros));
+      System.out.println(String.format("Total time to send: %d, start time %d", micros,
+          System.currentTimeMillis() - micros));
       while (true) {
         long nowMax = max.get();
         if (nowMax < micros) {
@@ -333,4 +210,161 @@ public class TestBitBitKerberos extends BaseTestQuery {
       // TODO(We don't have any interrupts in test code)
     }
   }
+
+  @Test
+  public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
+
+    new NonStrictExpectations() {{
+      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+      workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+    }};
+
+    DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+        new DataServerRequestHandler(workBus, bee));
+    DataServer server = new DataServer(config);
+
+    port = server.bind(port, true);
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+    DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+    DataTunnel tunnel = new DataTunnel(connectionManager);
+    AtomicLong max = new AtomicLong(0);
+    for (int i = 0; i < 40; i++) {
+      long t1 = System.currentTimeMillis();
+      tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+          1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+      System.out.println(System.currentTimeMillis() - t1);
+      // System.out.println("sent.");
+    }
+    System.out.println(String.format("Max time: %d", max.get()));
+    assertTrue(max.get() > 2700);
+    Thread.sleep(5000);
+  }
+
+  @Test
+  public void successEncryption(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
+
+    newConfig = new DrillConfig(
+      config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("kerberos")))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+          ConfigValueFactory.fromAnyRef("kerberos"))
+        .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+      false);
+
+    updateTestCluster(1, newConfig);
+
+    new NonStrictExpectations() {{
+      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+      workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+    }};
+
+    DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+      new DataServerRequestHandler(workBus, bee));
+    DataServer server = new DataServer(config);
+
+    port = server.bind(port, true);
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+    DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+    DataTunnel tunnel = new DataTunnel(connectionManager);
+    AtomicLong max = new AtomicLong(0);
+    for (int i = 0; i < 40; i++) {
+      long t1 = System.currentTimeMillis();
+      tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+        1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+      System.out.println(System.currentTimeMillis() - t1);
+    }
+    System.out.println(String.format("Max time: %d", max.get()));
+    assertTrue(max.get() > 2700);
+    Thread.sleep(5000);
+  }
+
+  @Test
+  public void successEncryptionChunkMode(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus)
+    throws Exception {
+    newConfig = new DrillConfig(
+      config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("kerberos")))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+          ConfigValueFactory.fromAnyRef("kerberos"))
+        .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
+          ConfigValueFactory.fromAnyRef(100000))
+        .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(true))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+          ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+          ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+      false);
+
+    updateTestCluster(1, newConfig);
+
+    new NonStrictExpectations() {{
+      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+      workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+    }};
+
+    DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+      new DataServerRequestHandler(workBus, bee));
+    DataServer server = new DataServer(config);
+
+    port = server.bind(port, true);
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+    DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+    DataTunnel tunnel = new DataTunnel(connectionManager);
+    AtomicLong max = new AtomicLong(0);
+    for (int i = 0; i < 40; i++) {
+      long t1 = System.currentTimeMillis();
+      tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+        1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+      System.out.println(System.currentTimeMillis() - t1);
+    }
+    System.out.println(String.format("Max time: %d", max.get()));
+    assertTrue(max.get() > 2700);
+    Thread.sleep(5000);
+  }
+
+  @Test
+  public void failureEncryptionOnlyPlainMechanism() throws Exception {
+    try{
+      newConfig = new DrillConfig(
+        config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+          ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+            ConfigValueFactory.fromAnyRef("kerberos"))
+          .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(true))
+          .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+          .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+        false);
+
+      updateTestCluster(1, newConfig);
+      fail();
+    } catch(Exception ex) {
+      assertTrue(ex.getCause() instanceof DrillbitStartupException);
+    }
+  }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    krbHelper.stopKdc();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
index b3c15bd..bdc3230 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
@@ -17,19 +17,13 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import static org.junit.Assert.assertTrue;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import mockit.Injectable;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.NonStrictExpectations;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -57,8 +51,12 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.junit.Test;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertTrue;
 
 public class TestBitRpc extends ExecTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
new file mode 100644
index 0000000..3320cef
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+
+import static org.apache.drill.exec.ExecTest.getTempDir;
+
+public class KerberosHelper {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KerberosHelper.class);
+
+  public File workspace;
+
+  private File kdcDir;
+  private SimpleKdcServer kdc;
+  private int kdcPort;
+
+  private final String HOSTNAME = "localhost";
+
+  public final String CLIENT_SHORT_NAME = "testUser";
+  public final String CLIENT_PRINCIPAL;
+
+  public String SERVER_PRINCIPAL;
+  private final String testName;
+
+  private File keytabDir;
+  public File clientKeytab;
+  public File serverKeytab;
+
+  private boolean kdcStarted;
+
+  public KerberosHelper(final String testName) {
+    final String realm = "EXAMPLE.COM";
+    CLIENT_PRINCIPAL = CLIENT_SHORT_NAME + "@" + realm;
+    final String serverShortName = System.getProperty("user.name");
+    SERVER_PRINCIPAL = serverShortName + "/" + HOSTNAME + "@" + realm;
+    this.testName = testName;
+  }
+
+  public void setupKdc() throws Exception {
+    kdc = new SimpleKdcServer();
+    workspace = new File(getTempDir("kerberos_target"));
+
+    kdcDir = new File(workspace, testName);
+    if(!kdcDir.mkdirs()) {
+      throw new Exception(String.format("Failed to create the kdc directory %s", kdcDir.getName()));
+    }
+    kdc.setWorkDir(kdcDir);
+
+    kdc.setKdcHost(HOSTNAME);
+    kdcPort = getFreePort();
+    kdc.setAllowTcp(true);
+    kdc.setAllowUdp(false);
+    kdc.setKdcTcpPort(kdcPort);
+
+    logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
+
+    kdc.init();
+    kdc.start();
+    kdcStarted = true;
+
+
+    keytabDir = new File(workspace, testName + "_keytabs");
+    if(!keytabDir.mkdirs()) {
+      throw new Exception(String.format("Failed to create the keytab directory %s", keytabDir.getName()));
+    }
+    setupUsers(keytabDir);
+
+    // Kerby sets "java.security.krb5.conf" for us!
+    System.clearProperty("java.security.auth.login.config");
+    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+    // Uncomment the following lines for debugging.
+    // System.setProperty("sun.security.spnego.debug", "true");
+    // System.setProperty("sun.security.krb5.debug", "true");
+  }
+
+  private int getFreePort() throws IOException {
+    ServerSocket s = null;
+    try {
+      s = new ServerSocket(0);
+      s.setReuseAddress(true);
+      return s.getLocalPort();
+    } finally {
+      if (s != null) {
+        s.close();
+      }
+    }
+  }
+
+  private void setupUsers(File keytabDir) throws KrbException {
+    // Create the client user
+    String clientPrincipal = CLIENT_PRINCIPAL.substring(0, CLIENT_PRINCIPAL.indexOf('@'));
+    clientKeytab = new File(keytabDir, clientPrincipal.replace('/', '_') + ".keytab");
+    logger.debug("Creating {} with keytab {}", clientPrincipal, clientKeytab);
+    setupUser(kdc, clientKeytab, clientPrincipal);
+
+    // Create the server user
+    String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
+    serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
+    logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
+    setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
+  }
+
+  private void setupUser(SimpleKdcServer kdc, File keytab, String principal)
+    throws KrbException {
+    kdc.createPrincipal(principal);
+    kdc.exportPrincipal(principal, keytab);
+  }
+
+  public void stopKdc() throws Exception {
+    if (kdcStarted) {
+      logger.info("Stopping KDC on {}", kdcPort);
+      kdc.stop();
+    }
+
+    deleteIfExists(clientKeytab);
+    deleteIfExists(serverKeytab);
+    deleteIfExists(keytabDir);
+    deleteIfExists(kdcDir);
+    deleteIfExists(workspace);
+  }
+
+  private void deleteIfExists(File file) throws IOException {
+    if (file != null) {
+      Files.deleteIfExists(file.toPath());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index 177268c..3fad005 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -23,99 +23,48 @@ import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
 import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
 import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.hadoop.security.UgiTestUtil;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.apache.kerby.kerberos.kerb.KrbException;
 import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.security.auth.Subject;
-import java.io.File;
-import java.io.IOException;
 import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.nio.file.Files;
 import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
 
 @Ignore("See DRILL-5387")
 public class TestUserBitKerberos extends BaseTestQuery {
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(TestUserBitKerberos.class);
+  //private static final org.slf4j.Logger logger =org.slf4j.LoggerFactory.getLogger(TestUserBitKerberos.class);
 
-  private static File workspace;
-
-  private static File kdcDir;
-  private static SimpleKdcServer kdc;
-  private static int kdcPort;
-
-  private static final String HOSTNAME = "localhost";
-  private static final String REALM = "EXAMPLE.COM";
-
-  private static final String CLIENT_SHORT_NAME = "testUser";
-  private static final String CLIENT_PRINCIPAL = CLIENT_SHORT_NAME + "@" + REALM;
-  private static final String SERVER_SHORT_NAME = System.getProperty("user.name");
-  private static final String SERVER_PRINCIPAL = SERVER_SHORT_NAME + "/" + HOSTNAME + "@" + REALM;
-
-  private static File keytabDir;
-  private static File clientKeytab;
-  private static File serverKeytab;
-
-  private static boolean kdcStarted;
+  private static KerberosHelper krbHelper;
 
   @BeforeClass
-  public static void setupKdc() throws Exception {
-    kdc = new SimpleKdcServer();
-    workspace = new File(getTempDir("kerberos_target"));
-
-    kdcDir = new File(workspace, TestUserBitKerberos.class.getSimpleName());
-    kdcDir.mkdirs();
-    kdc.setWorkDir(kdcDir);
-
-    kdc.setKdcHost(HOSTNAME);
-    kdcPort = getFreePort();
-    kdc.setAllowTcp(true);
-    kdc.setAllowUdp(false);
-    kdc.setKdcTcpPort(kdcPort);
+  public static void setupTest() throws Exception {
 
-    logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
-
-    kdc.init();
-    kdc.start();
-    kdcStarted = true;
-
-
-    keytabDir = new File(workspace, TestUserBitKerberos.class.getSimpleName()
-        + "_keytabs");
-    keytabDir.mkdirs();
-    setupUsers(keytabDir);
-
-    // Kerby sets "java.security.krb5.conf" for us!
-    System.clearProperty("java.security.auth.login.config");
-    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
-    // Uncomment the following lines for debugging.
-    // System.setProperty("sun.security.spnego.debug", "true");
-    // System.setProperty("sun.security.krb5.debug", "true");
+    krbHelper = new KerberosHelper(TestUserBitKerberos.class.getSimpleName());
+    krbHelper.setupKdc();
 
+    // Create a new DrillConfig which has user authentication enabled and authenticator set to
+    // UserAuthenticatorTestImpl.
     final DrillConfig newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
-        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
-            ConfigValueFactory.fromAnyRef(true))
-        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
-            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
-        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
-            ConfigValueFactory.fromAnyRef(SERVER_PRINCIPAL))
-        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
-            ConfigValueFactory.fromAnyRef(serverKeytab.toString()))
-        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
-            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))),
-        false);
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))),
+      false);
 
     final Properties connectionProps = new Properties();
     connectionProps.setProperty(DrillProperties.USER, "anonymous");
@@ -136,66 +85,12 @@ public class TestUserBitKerberos extends BaseTestQuery {
     updateTestCluster(1, newConfig, connectionProps);
   }
 
-  private static int getFreePort() throws IOException {
-    ServerSocket s = null;
-    try {
-      s = new ServerSocket(0);
-      s.setReuseAddress(true);
-      return s.getLocalPort();
-    } finally {
-      if (s != null) {
-        s.close();
-      }
-    }
-  }
-
-  private static void setupUsers(File keytabDir) throws KrbException {
-    // Create the client user
-    String clientPrincipal = CLIENT_PRINCIPAL.substring(0, CLIENT_PRINCIPAL.indexOf('@'));
-    clientKeytab = new File(keytabDir, clientPrincipal.replace('/', '_') + ".keytab");
-    logger.debug("Creating {} with keytab {}", clientPrincipal, clientKeytab);
-    setupUser(kdc, clientKeytab, clientPrincipal);
-
-    // Create the server user
-    String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
-    serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
-    logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
-    setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
-  }
-
-  private static void setupUser(SimpleKdcServer kdc, File keytab, String principal)
-      throws KrbException {
-    kdc.createPrincipal(principal);
-    kdc.exportPrincipal(principal, keytab);
-  }
-
-  @AfterClass
-  public static void stopKdc() throws Exception {
-    if (kdcStarted) {
-      logger.info("Stopping KDC on {}", kdcPort);
-      kdc.stop();
-    }
-
-    deleteIfExists(clientKeytab);
-    deleteIfExists(serverKeytab);
-    deleteIfExists(keytabDir);
-    deleteIfExists(kdcDir);
-    deleteIfExists(workspace);
-    UgiTestUtil.resetUgi();
-  }
-
-  private static void deleteIfExists(File file) throws IOException {
-    if (file != null) {
-      Files.deleteIfExists(file.toPath());
-    }
-  }
-
   @Test
   public void successKeytab() throws Exception {
     final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, SERVER_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.USER, CLIENT_PRINCIPAL);
-    connectionProps.setProperty(DrillProperties.KEYTAB, clientKeytab.getAbsolutePath());
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
     updateClient(connectionProps);
 
     // Run few queries using the new client
@@ -203,7 +98,7 @@ public class TestUserBitKerberos extends BaseTestQuery {
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
-        .baselineValues(CLIENT_SHORT_NAME)
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
     test("SHOW SCHEMAS");
     test("USE INFORMATION_SCHEMA");
@@ -215,9 +110,10 @@ public class TestUserBitKerberos extends BaseTestQuery {
   @Test
   public void successTicket() throws Exception {
     final Properties connectionProps = new Properties();
-    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
     connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
-    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(CLIENT_PRINCIPAL, clientKeytab.getAbsoluteFile());
+    final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
+      krbHelper.clientKeytab.getAbsoluteFile());
 
     Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
       @Override
@@ -232,7 +128,7 @@ public class TestUserBitKerberos extends BaseTestQuery {
         .sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
         .unOrdered()
         .baselineColumns("session_user")
-        .baselineValues(CLIENT_SHORT_NAME)
+        .baselineValues(krbHelper.CLIENT_SHORT_NAME)
         .go();
     test("SHOW SCHEMAS");
     test("USE INFORMATION_SCHEMA");
@@ -240,4 +136,9 @@ public class TestUserBitKerberos extends BaseTestQuery {
     test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
     test("SELECT * FROM cp.`region.json` LIMIT 5");
   }
+
+  @AfterClass
+  public static void cleanTest() throws Exception {
+    krbHelper.stopKdc();
+  }
 }