You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/03/12 11:54:36 UTC

[3/5] drill git commit: DRILL-6187: Exception in RPC communication between DataClient/ControlClient and respective servers when bit-to-bit security is on

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index aa26fd6..640eb40 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -111,7 +111,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
     connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
     connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
     connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-    updateClient(connectionProps);
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true)));
+
+    updateTestCluster(1, newConfig, connectionProps);
 
     // Run few queries using the new client
     testBuilder()
@@ -145,7 +160,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
     connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
     connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
     connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
-    updateClient(connectionProps);
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true)));
+
+    updateTestCluster(1, newConfig, connectionProps);
 
     assertTrue(UserRpcMetrics.getInstance().getEncryptedConnectionCount() == 1);
     assertTrue(UserRpcMetrics.getInstance().getUnEncryptedConnectionCount() == 0);
@@ -177,10 +207,24 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
     final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
                                                                krbHelper.clientKeytab.getAbsoluteFile());
 
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+      .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+        ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+      .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+        ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+      .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+        ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+      .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+        ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+      .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+        ConfigValueFactory.fromAnyRef(true)));
+
     Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-        updateClient(connectionProps);
+        updateTestCluster(1, newConfig, connectionProps);
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 34f75f4..f86d698 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.math3.util.Pair;
 import org.apache.drill.exec.work.foreman.FragmentsRunner;
@@ -205,7 +206,7 @@ public class TestDrillbitResilience extends DrillTest {
 
     // create a client
     final DrillConfig drillConfig = zkHelper.getConfig();
-    drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null);
+    drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties());
     clearAllInjections();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
index 94c8ebf..1098dc4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -87,7 +87,7 @@ public class TestResourceLeak extends DrillTest {
 
     bit = new Drillbit(config, serviceSet);
     bit.run();
-    client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+    client = QueryTestUtil.createClient(config, serviceSet, 2, new Properties());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/rpc/pom.xml b/exec/rpc/pom.xml
index ea56574..5ed62ee 100644
--- a/exec/rpc/pom.xml
+++ b/exec/rpc/pom.xml
@@ -77,6 +77,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0f4ef1b..4395db3 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.rpc;
 
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -32,16 +36,17 @@ import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-
-import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -69,6 +74,9 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
   private final IdlePingHandler pingHandler;
   private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null;
 
+  // Determines if authentication is completed between client and server
+  private boolean authComplete = true;
+
   public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
                      Class<HR> responseClass, Parser<HR> handshakeParser) {
     super(rpcMapping);
@@ -133,6 +141,19 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
     return false;
   }
 
+  /**
+   * Set's the state for authentication complete.
+   * @param authComplete - state to set. True means authentication between client and server is completed, false
+   *                     means authentication is in progress.
+   */
+  protected void setAuthComplete(boolean authComplete) {
+    this.authComplete = authComplete;
+  }
+
+  protected boolean isAuthComplete() {
+    return authComplete;
+  }
+
   // Save the SslChannel after the SSL handshake so it can be closed later
   public void setSslChannel(Channel c) {
 
@@ -180,7 +201,67 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
     return (connection != null) && connection.isActive();
   }
 
-  protected abstract void validateHandshake(HR validateHandshake) throws RpcException;
+  protected abstract List<String> validateHandshake(HR validateHandshake) throws RpcException;
+
+  /**
+   * Creates various instances needed to start the SASL handshake. This is called from
+   * {@link BasicClient#validateHandshake(MessageLite)} if authentication is required from server side.
+   * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+   * @param serverAuthMechanisms - List of auth mechanisms configured on server side
+   */
+  protected abstract void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
+                                               List<String> serverAuthMechanisms) throws RpcException;
+
+  /**
+   * Main method which starts the SASL handshake for all client channels (user/data/control) once it's determined
+   * after regular RPC handshake that authentication is required by server side. Once authentication is completed
+   * then only the underlying channel is made available to clients to send other RPC messages. Success and failure
+   * events are notified to the connection handler on which client waits.
+   * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+   * @param saslProperties - SASL related properties needed to create SASL client.
+   * @param ugi - UserGroupInformation with logged in client side user
+   * @param authFactory - Authentication factory to use for this SASL handshake.
+   * @param rpcType - SASL_MESSAGE rpc type.
+   */
+  protected void startSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
+                                    Map<String, ?> saslProperties, UserGroupInformation ugi,
+                                    AuthenticatorFactory authFactory, T rpcType) {
+    final String mechanismName = authFactory.getSimpleName();
+    try {
+      final SaslClient saslClient = authFactory.createSaslClient(ugi, saslProperties);
+      if (saslClient == null) {
+        final Exception ex = new SaslException(String.format("Cannot initiate authentication using %s mechanism. " +
+          "Insufficient credentials or selected mechanism doesn't support configured security layers?", mechanismName));
+        connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+        return;
+      }
+      connection.setSaslClient(saslClient);
+    } catch (final SaslException e) {
+      logger.error("Failed while creating SASL client for SASL handshake for connection", connection.getName());
+      connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, e);
+      return;
+    }
+
+    logger.debug("Initiating SASL exchange.");
+    new AuthenticationOutcomeListener<>(this, connection, rpcType, ugi,
+      new RpcOutcomeListener<Void>() {
+      @Override
+      public void failed(RpcException ex) {
+        connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+      }
+
+      @Override
+      public void success(Void value, ByteBuf buffer) {
+        authComplete = true;
+        connectionHandler.connectionSucceeded(connection);
+      }
+
+      @Override
+      public void interrupted(InterruptedException ex) {
+        connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+      }
+    }).initiate(mechanismName);
+  }
 
   protected void finalizeConnection(HR handshake, CC connection) {
     // no-op
@@ -204,12 +285,6 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
         allowInEventLoop, dataBodies);
   }
 
-  // the command itself must be "run" by the caller (to avoid calling inEventLoop)
-  protected <M extends MessageLite> RpcCommand<M, CC>
-  getInitialCommand(final RpcCommand<M, CC> command) {
-    return command;
-  }
-
   protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
                                  String host, int port) {
     ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml;

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
index 0cdca13..3fee5d7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.DrillException;
 import org.slf4j.Logger;
 
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -151,12 +152,21 @@ public class ConnectionMultiListener<T extends EnumLite, CC extends ClientConnec
     public void success(HR value, ByteBuf buffer) {
       // logger.debug("Handshake received. {}", value);
       try {
-        parent.validateHandshake(value);
+        final List<String> serverAuthMechanisms = parent.validateHandshake(value);
         parent.finalizeConnection(value, parent.connection);
-        connectionListener.connectionSucceeded(parent.connection);
-        // logger.debug("Handshake completed succesfully.");
+
+        // If auth is required then start the SASL handshake
+        if (serverAuthMechanisms != null) {
+          parent.prepareSaslHandshake(connectionListener, serverAuthMechanisms);
+        } else {
+          connectionListener.connectionSucceeded(parent.connection);
+          logger.debug("Handshake completed successfully.");
+        }
+      } catch (NonTransientRpcException ex) {
+        logger.error("Failure while validating client and server sasl compatibility", ex);
+        connectionListener.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
       } catch (Exception ex) {
-        logger.debug("Failure while validating handshake", ex);
+        logger.error("Failure while validating handshake", ex);
         connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index a64a23b..3936170 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -78,7 +78,7 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte
       } else {
 //        logger.debug("No connection active, opening client connection.");
         BasicClient<?, C, HS, ?> client = getNewClient();
-        ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(client.getInitialCommand(cmd));
+        ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(cmd);
         client.connectAsClient(future, handshake, host, port);
         future.waitAndRun();
 //        logger.debug("Connection available and active, command now being run inline.");

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
new file mode 100644
index 0000000..5c34d01
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the client-side.
+ *
+ * @param <T> handshake rpc type
+ * @param <C> Client connection type
+ * @param <HS> Handshake send type
+ * @param <HR> Handshake receive type
+ */
+public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientConnection,
+    HS extends MessageLite, HR extends MessageLite>
+    implements RpcOutcomeListener<SaslMessage> {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
+
+  private static final ImmutableMap<SaslStatus, SaslChallengeProcessor>
+      CHALLENGE_PROCESSORS;
+  static {
+    final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
+    map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+    map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+    map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+    CHALLENGE_PROCESSORS = Maps.immutableEnumMap(map);
+  }
+
+  private final BasicClient<T, C, HS, HR> client;
+  private final C connection;
+  private final T saslRpcType;
+  private final UserGroupInformation ugi;
+  private final RpcOutcomeListener<?> completionListener;
+
+  public AuthenticationOutcomeListener(BasicClient<T, C, HS, HR> client,
+                                       C connection, T saslRpcType, UserGroupInformation ugi,
+                                       RpcOutcomeListener<?> completionListener) {
+    this.client = client;
+    this.connection = connection;
+    this.saslRpcType = saslRpcType;
+    this.ugi = ugi;
+    this.completionListener = completionListener;
+  }
+
+  public void initiate(final String mechanismName) {
+    logger.trace("Initiating SASL exchange.");
+    try {
+      final ByteString responseData;
+      final SaslClient saslClient = connection.getSaslClient();
+      if (saslClient.hasInitialResponse()) {
+        responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0]));
+      } else {
+        responseData = ByteString.EMPTY;
+      }
+      client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+          connection,
+          saslRpcType,
+          SaslMessage.newBuilder()
+              .setMechanism(mechanismName)
+              .setStatus(SaslStatus.SASL_START)
+              .setData(responseData)
+              .build(),
+          SaslMessage.class,
+          true /* the connection will not be backed up at this point */);
+      logger.trace("Initiated SASL exchange.");
+    } catch (final Exception e) {
+      completionListener.failed(RpcException.mapException(e));
+    }
+  }
+
+  @Override
+  public void failed(RpcException ex) {
+    completionListener.failed(RpcException.mapException(ex));
+  }
+
+  @Override
+  public void success(SaslMessage value, ByteBuf buffer) {
+    logger.trace("Server responded with message of type: {}", value.getStatus());
+    final SaslChallengeProcessor processor = CHALLENGE_PROCESSORS.get(value.getStatus());
+    if (processor == null) {
+      completionListener.failed(RpcException.mapException(
+          new SaslException("Server sent a corrupt message.")));
+    } else {
+      // SaslSuccessProcessor.process disposes saslClient so get mechanism here to use later in logging
+      final String mechanism = connection.getSaslClient().getMechanismName();
+      try {
+        final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection);
+        final SaslMessage saslResponse = processor.process(context);
+
+        if (saslResponse != null) {
+          client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+              connection, saslRpcType, saslResponse, SaslMessage.class,
+              true /* the connection will not be backed up at this point */);
+        } else {
+          // success
+          completionListener.success(null, null);
+          if (logger.isTraceEnabled()) {
+            logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}",
+                mechanism, connection.getEncryptionCtxtString());
+          }
+        }
+      } catch (final Exception e) {
+        logger.error("Authentication with encryption context: {} using mechanism {} failed with {}",
+            connection.getEncryptionCtxtString(), mechanism, e.getMessage());
+        completionListener.failed(RpcException.mapException(e));
+      }
+    }
+  }
+
+  @Override
+  public void interrupted(InterruptedException e) {
+    completionListener.interrupted(e);
+  }
+
+  private static class SaslChallengeContext<C extends ClientConnection> {
+
+    final SaslMessage challenge;
+    final UserGroupInformation ugi;
+    final C connection;
+
+    SaslChallengeContext(SaslMessage challenge, UserGroupInformation ugi, C connection) {
+      this.challenge = checkNotNull(challenge);
+      this.ugi = checkNotNull(ugi);
+      this.connection = checkNotNull(connection);
+    }
+  }
+
+  private interface SaslChallengeProcessor {
+
+    /**
+     * Process challenge from server, and return a response.
+     *
+     * Returns null iff SASL exchange is complete and successful.
+     *
+     * @param context challenge context
+     * @return response
+     * @throws Exception in case of any failure
+     */
+    <CC extends ClientConnection>
+    SaslMessage process(SaslChallengeContext<CC> context) throws Exception;
+
+  }
+
+  private static class SaslInProgressProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+      final SaslMessage.Builder response = SaslMessage.newBuilder();
+      final SaslClient saslClient = context.connection.getSaslClient();
+
+      final byte[] responseBytes = evaluateChallenge(context.ugi, saslClient,
+          context.challenge.getData().toByteArray());
+
+      final boolean isComplete = saslClient.isComplete();
+      logger.trace("Evaluated challenge. Completed? {}.", isComplete);
+      response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
+      // if isComplete, the client will get one more response from server
+      response.setStatus(isComplete ? SaslStatus.SASL_SUCCESS : SaslStatus.SASL_IN_PROGRESS);
+      return response.build();
+    }
+  }
+
+  private static class SaslSuccessProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+      final SaslClient saslClient = context.connection.getSaslClient();
+
+      if (saslClient.isComplete()) {
+        handleSuccess(context);
+        return null;
+      } else {
+        // server completed before client; so try once, fail otherwise
+        evaluateChallenge(context.ugi, saslClient, context.challenge.getData().toByteArray()); // discard response
+
+        if (saslClient.isComplete()) {
+          handleSuccess(context);
+          return null;
+        } else {
+          throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
+        }
+      }
+    }
+  }
+
+  private static class SaslFailedProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+      throw new SaslException(String.format("Authentication failed. Incorrect credentials? [Details: %s]",
+          context.connection.getEncryptionCtxtString()));
+    }
+  }
+
+  private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient,
+                                          final byte[] challengeBytes) throws SaslException {
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+        @Override
+        public byte[] run() throws Exception {
+          return saslClient.evaluateChallenge(challengeBytes);
+        }
+      });
+    } catch (final UndeclaredThrowableException e) {
+      throw new SaslException(
+          String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e.getCause());
+    } catch (final IOException | InterruptedException e) {
+      if (e instanceof SaslException) {
+        throw (SaslException) e;
+      } else {
+        throw new SaslException(
+            String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e);
+      }
+    }
+  }
+
+
+  private static <CC extends ClientConnection> void handleSuccess(SaslChallengeContext<CC> context) throws
+      SaslException {
+    final CC connection = context.connection;
+    final SaslClient saslClient = connection.getSaslClient();
+
+    try {
+      // Check if connection was marked for being secure then verify for negotiated QOP value for
+      // correctness.
+      final String negotiatedQOP = saslClient.getNegotiatedProperty(Sasl.QOP).toString();
+      final String expectedQOP = connection.isEncryptionEnabled()
+          ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+          : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+      if (!(negotiatedQOP.equals(expectedQOP))) {
+        throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+            negotiatedQOP, expectedQOP));
+      }
+
+      // Update the rawWrapChunkSize with the negotiated buffer size since we cannot call encode with more than
+      // negotiated size of buffer.
+      if (connection.isEncryptionEnabled()) {
+        final int negotiatedRawSendSize = Integer.parseInt(
+            saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+        if (negotiatedRawSendSize <= 0) {
+          throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+              "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+              negotiatedRawSendSize));
+        }
+        connection.setWrapSizeLimit(negotiatedRawSendSize);
+      }
+    } catch (Exception e) {
+      throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+          e.getMessage()), e);
+    }
+
+    if (connection.isEncryptionEnabled()) {
+      connection.addSecurityHandlers();
+    } else {
+      // Encryption is not required hence we don't need to hold on to saslClient object.
+      connection.disposeSaslClient();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
new file mode 100644
index 0000000..307ae97
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An implementation of this factory will be initialized once at startup, if the authenticator is enabled
+ * (see {@link #getSimpleName}). For every request for this mechanism (i.e. after establishing a connection),
+ * {@link #createSaslServer} will be invoked on the server-side and {@link #createSaslClient} will be invoked
+ * on the client-side.
+ *
+ * Note:
+ * + Custom authenticators must have a default constructor.
+ *
+ * Examples: PlainFactory and KerberosFactory.
+ */
+public interface AuthenticatorFactory extends AutoCloseable {
+
+  /**
+   * Name of the mechanism, in upper case.
+   *
+   * If this mechanism is present in the list of enabled mechanisms, an instance of this factory is loaded. Note
+   * that the simple name maybe the same as it's SASL name.
+   *
+   * @return mechanism name
+   */
+  String getSimpleName();
+
+  /**
+   * Create and get the login user based on the given properties.
+   *
+   * @param properties config properties
+   * @return ugi
+   * @throws IOException
+   */
+  UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException;
+
+  /**
+   * The caller is responsible for {@link SaslServer#dispose disposing} the returned SaslServer.
+   *
+   * @param ugi ugi
+   * @param properties config properties
+   * @return sasl server
+   * @throws SaslException
+   */
+  SaslServer createSaslServer(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+  /**
+   * The caller is responsible for {@link SaslClient#dispose disposing} the returned SaslClient.
+   *
+   * @param ugi ugi
+   * @param properties config properties
+   * @return sasl client
+   * @throws SaslException
+   */
+  SaslClient createSaslClient(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
new file mode 100644
index 0000000..9ed85ce
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.Sasl;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class SaslProperties {
+
+  /**
+   * All supported Quality of Protection values which can be negotiated
+   */
+  enum QualityOfProtection {
+    AUTHENTICATION("auth"),
+    INTEGRITY("auth-int"),
+    PRIVACY("auth-conf");
+
+    public final String saslQop;
+
+    QualityOfProtection(String saslQop) {
+      this.saslQop = saslQop;
+    }
+
+    public String getSaslQop() {
+      return saslQop;
+    }
+  }
+
+  /**
+   * Get's the map of minimum set of SaslProperties required during negotiation process either for encryption
+   * or authentication
+   * @param encryptionEnabled - Flag to determine if property needed is for encryption or authentication
+   * @param wrappedChunkSize  - Configured wrappedChunkSize to negotiate for.
+   * @return Map of SaslProperties which will be used in negotiation.
+   */
+  public static Map<String, String> getSaslProperties(boolean encryptionEnabled, int wrappedChunkSize) {
+    Map<String, String> saslProps = new HashMap<>();
+
+    if (encryptionEnabled) {
+      saslProps.put(Sasl.STRENGTH, "high");
+      saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+      saslProps.put(Sasl.MAX_BUFFER, Integer.toString(wrappedChunkSize));
+      saslProps.put(Sasl.POLICY_NOPLAINTEXT, "true");
+    } else {
+      saslProps.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.getSaslQop());
+    }
+
+    return saslProps;
+  }
+
+  private SaslProperties() {
+
+  }
+}
\ No newline at end of file