You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/02/25 07:18:14 UTC

[21/29] drill git commit: DRILL-4280: CORE (security package)

DRILL-4280: CORE (security package)

+ Add AuthenticatorFactory interface
+ Kerberos implementation
  + includes SaslServer and SaslClient wrappers
+ Plain implementation
  + PlainServer implements SaslServer (unavailable in Java)
    for username/password based authentication
  + retrofit user authenticator
  + add logic for backward compatibility

+ Add AuthenticatorProvider interface to provide authenticator
  factories, and add two implementations:
  + DrillConfig and ScanResult based AuthenticatorProviderImpl
  + Default and system property based ClientAuthenticatorProvider

+ FastSaslServerFactory caches SaslServer factories
+ FastSaslClientFactory caches SaslClient factories

+ ServerAuthenticationHandler handles authentication on server-side
+ FailingRequestHandler to fail any message received
+ AuthenticationOutcomeListener handles authentication on client-side

security


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8a732c08
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8a732c08
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8a732c08

Branch: refs/heads/master
Commit: 8a732c0827cb0bb629718622de60ce8fcf70c571
Parents: f445b08
Author: Sudheesh Katkam <su...@apache.org>
Authored: Wed Jan 25 18:57:35 2017 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Feb 24 19:01:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../drill/exec/rpc/FailingRequestHandler.java   |  39 +++
 .../drill/exec/rpc/security/AuthStringUtil.java |  60 ++++
 .../security/AuthenticationOutcomeListener.java | 246 ++++++++++++++++
 .../exec/rpc/security/AuthenticatorFactory.java |  80 ++++++
 .../rpc/security/AuthenticatorProvider.java     |  33 +++
 .../rpc/security/AuthenticatorProviderImpl.java | 141 ++++++++++
 .../security/ClientAuthenticatorProvider.java   | 104 +++++++
 .../rpc/security/FastSaslClientFactory.java     | 113 ++++++++
 .../rpc/security/FastSaslServerFactory.java     | 111 ++++++++
 .../security/ServerAuthenticationHandler.java   | 280 +++++++++++++++++++
 .../rpc/security/kerberos/KerberosFactory.java  | 216 ++++++++++++++
 .../drill/exec/rpc/security/package-info.java   |  41 +++
 .../exec/rpc/security/plain/PlainFactory.java   | 117 ++++++++
 .../exec/rpc/security/plain/PlainServer.java    | 146 ++++++++++
 .../src/main/resources/drill-module.conf        |  11 +-
 16 files changed, 1737 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 846cd8b..b8f0c23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -108,9 +108,12 @@ public interface ExecConstants {
   String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
   String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
   String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
+  String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";
   String USER_AUTHENTICATION_ENABLED = "drill.exec.security.user.auth.enabled";
   String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
   String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
+  String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
+  String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
new file mode 100644
index 0000000..13733ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * This handler fails any request on the connection. Example use case: the peer is making requests
+ * before authenticating.
+ *
+ * @param <S> server connection type
+ */
+public class FailingRequestHandler<S extends ServerConnection<S>> implements RequestHandler<S> {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FailingRequestHandler.class);
+
+  @Override
+  public void handle(S connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+      throws RpcException {
+
+    // drops connection
+    throw new RpcException(String.format("Request of type %d is not yet allowed. Dropping connection to %s.",
+            rpcType, connection.getRemoteAddress()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
new file mode 100644
index 0000000..01740a5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class AuthStringUtil {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AuthStringUtil.class);
+
+  // ignores case
+  public static boolean listContains(final List<String> list, final String toCompare) {
+    for (final String string : list) {
+      if (string.equalsIgnoreCase(toCompare)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // converts list if strings to set of uppercase strings
+  public static Set<String> asSet(final List<String> list) {
+    if (list == null) {
+      return Sets.newHashSet();
+    }
+    return Sets.newHashSet(Iterators.transform(list.iterator(),
+        new Function<String, String>() {
+          @Nullable
+          @Override
+          public String apply(@Nullable String input) {
+            return input == null ? null : input.toUpperCase();
+          }
+        }));
+  }
+
+  // prevent instantiation
+  private AuthStringUtil() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
new file mode 100644
index 0000000..9c74ddc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the client-side.
+ *
+ * @param <T> handshake rpc type
+ * @param <C> Client connection type
+ * @param <HS> Handshake send type
+ * @param <HR> Handshake receive type
+ */
+public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientConnection,
+    HS extends MessageLite, HR extends MessageLite>
+    implements RpcOutcomeListener<SaslMessage> {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
+
+  private static final ImmutableMap<SaslStatus, SaslChallengeProcessor> CHALLENGE_PROCESSORS;
+  static {
+    final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
+    map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+    map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+    map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+    CHALLENGE_PROCESSORS = Maps.immutableEnumMap(map);
+  }
+
+  private final BasicClient<T, C, HS, HR> client;
+  private final C connection;
+  private final T saslRpcType;
+  private final UserGroupInformation ugi;
+  private final RpcOutcomeListener<?> completionListener;
+
+  public AuthenticationOutcomeListener(BasicClient<T, C, HS, HR> client,
+                                       C connection, T saslRpcType, UserGroupInformation ugi,
+                                       RpcOutcomeListener<?> completionListener) {
+    this.client = client;
+    this.connection = connection;
+    this.saslRpcType = saslRpcType;
+    this.ugi = ugi;
+    this.completionListener = completionListener;
+  }
+
+  public void initiate(final String mechanismName) {
+    logger.trace("Initiating SASL exchange.");
+    try {
+      final ByteString responseData;
+      final SaslClient saslClient = connection.getSaslClient();
+      if (saslClient.hasInitialResponse()) {
+        responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0]));
+      } else {
+        responseData = ByteString.EMPTY;
+      }
+      client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+          connection,
+          saslRpcType,
+          SaslMessage.newBuilder()
+              .setMechanism(mechanismName)
+              .setStatus(SaslStatus.SASL_START)
+              .setData(responseData)
+              .build(),
+          SaslMessage.class,
+          true /** the connection will not be backed up at this point */);
+      logger.trace("Initiated SASL exchange.");
+    } catch (final Exception e) {
+      completionListener.failed(RpcException.mapException(e));
+    }
+  }
+
+  @Override
+  public void failed(RpcException ex) {
+    completionListener.failed(RpcException.mapException(ex));
+  }
+
+  @Override
+  public void success(SaslMessage value, ByteBuf buffer) {
+    logger.trace("Server responded with message of type: {}", value.getStatus());
+    final SaslChallengeProcessor processor = CHALLENGE_PROCESSORS.get(value.getStatus());
+    if (processor == null) {
+      completionListener.failed(RpcException.mapException(
+          new SaslException("Server sent a corrupt message.")));
+    } else {
+      try {
+        final SaslChallengeContext context = new SaslChallengeContext(value, connection.getSaslClient(), ugi);
+
+        final SaslMessage saslResponse = processor.process(context);
+
+        if (saslResponse != null) {
+          client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+              connection, saslRpcType, saslResponse, SaslMessage.class,
+              true /** the connection will not be backed up at this point */);
+        } else {
+          // success
+          completionListener.success(null, null);
+        }
+      } catch (final Exception e) {
+        completionListener.failed(RpcException.mapException(e));
+      }
+    }
+  }
+
+  @Override
+  public void interrupted(InterruptedException e) {
+    completionListener.interrupted(e);
+  }
+
+  private static class SaslChallengeContext {
+
+    final SaslMessage challenge;
+    final SaslClient saslClient;
+    final UserGroupInformation ugi;
+
+    SaslChallengeContext(SaslMessage challenge, SaslClient saslClient, UserGroupInformation ugi) {
+      this.challenge = checkNotNull(challenge);
+      this.saslClient = checkNotNull(saslClient);
+      this.ugi = checkNotNull(ugi);
+    }
+  }
+
+  private interface SaslChallengeProcessor {
+
+    /**
+     * Process challenge from server, and return a response.
+     *
+     * Returns null iff SASL exchange is complete and successful.
+     *
+     * @param context challenge context
+     * @return response
+     * @throws Exception
+     */
+    SaslMessage process(SaslChallengeContext context) throws Exception;
+
+  }
+
+  private static class SaslInProgressProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public SaslMessage process(SaslChallengeContext context) throws Exception {
+      final SaslMessage.Builder response = SaslMessage.newBuilder();
+
+      final byte[] responseBytes = evaluateChallenge(context.ugi, context.saslClient,
+          context.challenge.getData().toByteArray());
+
+      final boolean isComplete = context.saslClient.isComplete();
+      logger.trace("Evaluated challenge. Completed? {}.", isComplete);
+      response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
+      // if isComplete, the client will get one more response from server
+      response.setStatus(isComplete ? SaslStatus.SASL_SUCCESS : SaslStatus.SASL_IN_PROGRESS);
+      return response.build();
+    }
+  }
+
+  private static class SaslSuccessProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public SaslMessage process(SaslChallengeContext context) throws Exception {
+      if (context.saslClient.isComplete()) {
+        logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
+        // setup security layers here..
+        return null;
+      } else {
+
+        // server completed before client; so try once, fail otherwise
+        evaluateChallenge(context.ugi, context.saslClient,
+            context.challenge.getData().toByteArray()); // discard response
+
+        if (context.saslClient.isComplete()) {
+          logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
+          // setup security layers here..
+          return null;
+        } else {
+          throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
+        }
+      }
+    }
+  }
+
+  private static class SaslFailedProcessor implements SaslChallengeProcessor {
+
+    @Override
+    public SaslMessage process(SaslChallengeContext context) throws Exception {
+      throw new SaslException("Authentication failed. Incorrect credentials?");
+    }
+  }
+
+  private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient,
+                                          final byte[] challengeBytes) throws SaslException {
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+        @Override
+        public byte[] run() throws Exception {
+          return saslClient.evaluateChallenge(challengeBytes);
+        }
+      });
+    } catch (final UndeclaredThrowableException e) {
+      throw new SaslException(
+          String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e.getCause());
+    } catch (final IOException | InterruptedException e) {
+      if (e instanceof SaslException) {
+        throw (SaslException) e;
+      } else {
+        throw new SaslException(
+            String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
new file mode 100644
index 0000000..307ae97
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An implementation of this factory will be initialized once at startup, if the authenticator is enabled
+ * (see {@link #getSimpleName}). For every request for this mechanism (i.e. after establishing a connection),
+ * {@link #createSaslServer} will be invoked on the server-side and {@link #createSaslClient} will be invoked
+ * on the client-side.
+ *
+ * Note:
+ * + Custom authenticators must have a default constructor.
+ *
+ * Examples: PlainFactory and KerberosFactory.
+ */
+public interface AuthenticatorFactory extends AutoCloseable {
+
+  /**
+   * Name of the mechanism, in upper case.
+   *
+   * If this mechanism is present in the list of enabled mechanisms, an instance of this factory is loaded. Note
+   * that the simple name maybe the same as it's SASL name.
+   *
+   * @return mechanism name
+   */
+  String getSimpleName();
+
+  /**
+   * Create and get the login user based on the given properties.
+   *
+   * @param properties config properties
+   * @return ugi
+   * @throws IOException
+   */
+  UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException;
+
+  /**
+   * The caller is responsible for {@link SaslServer#dispose disposing} the returned SaslServer.
+   *
+   * @param ugi ugi
+   * @param properties config properties
+   * @return sasl server
+   * @throws SaslException
+   */
+  SaslServer createSaslServer(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+  /**
+   * The caller is responsible for {@link SaslClient#dispose disposing} the returned SaslClient.
+   *
+   * @param ugi ugi
+   * @param properties config properties
+   * @return sasl client
+   * @throws SaslException
+   */
+  SaslClient createSaslClient(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
new file mode 100644
index 0000000..66ed98f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.SaslException;
+import java.util.Set;
+
+public interface AuthenticatorProvider extends AutoCloseable {
+
+  AuthenticatorFactory getAuthenticatorFactory(String name) throws SaslException;
+
+  Set<String> getAllFactoryNames();
+
+  boolean containsFactory(String name);
+
+  @Override
+  void close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
new file mode 100644
index 0000000..f4c60e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
+
+import javax.security.sasl.SaslException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AuthenticatorProviderImpl implements AuthenticatorProvider {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(AuthenticatorProviderImpl.class);
+
+  // Mapping: simple name -> authenticator factory
+  private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
+
+  @SuppressWarnings("unchecked")
+  public AuthenticatorProviderImpl(final DrillConfig config, final ScanResult scan) throws DrillbitStartupException {
+    List<String> configuredFactories = Lists.newArrayList();
+    if (config.hasPath(ExecConstants.AUTHENTICATION_MECHANISMS)) {
+      configuredFactories = config.getStringList(ExecConstants.AUTHENTICATION_MECHANISMS);
+    }
+
+    final Set<String> configuredFactoriesSet = AuthStringUtil.asSet(configuredFactories);
+    // to ensure backward compatibility of PLAIN config
+    if (config.hasPath(ExecConstants.USER_AUTHENTICATOR_IMPL)) {
+      configuredFactoriesSet.add(PlainFactory.SIMPLE_NAME);
+    }
+    if (configuredFactoriesSet.isEmpty()) {
+      return;
+    }
+
+    logger.debug("Configuring authenticator factories: {}", configuredFactories);
+    // PLAIN mechanism need special handling due to UserAuthenticator
+    if (configuredFactoriesSet.remove(PlainFactory.SIMPLE_NAME)) {
+      // instantiated here, but closed in PlainFactory#close
+      final UserAuthenticator userAuthenticator = UserAuthenticatorFactory.createAuthenticator(config, scan);
+      final PlainFactory factory = new PlainFactory(userAuthenticator);
+      authFactories.put(PlainFactory.SIMPLE_NAME, factory);
+      logger.trace("Plain mechanism enabled.");
+    }
+
+    // Then, load other authentication factories, if any
+    if (!configuredFactoriesSet.isEmpty()) {
+      final Collection<Class<? extends AuthenticatorFactory>> factoryImpls =
+          scan.getImplementations(AuthenticatorFactory.class);
+      logger.debug("Found AuthenticatorFactory implementations: {}", factoryImpls);
+
+      for (final Class<? extends AuthenticatorFactory> clazz : factoryImpls) {
+        Constructor<? extends AuthenticatorFactory> validConstructor = null;
+        for (final Constructor<?> c : clazz.getConstructors()) {
+          final Class<?>[] params = c.getParameterTypes();
+          if (params.length == 0) {
+            validConstructor = (Constructor<? extends AuthenticatorFactory>) c; // unchecked
+            break;
+          }
+        }
+
+        if (validConstructor == null) {
+          logger.warn("Skipping authentication factory class {}. It must implement at least one constructor " +
+              "with signature [{}()]", clazz.getCanonicalName(), clazz.getName());
+          continue;
+        }
+
+        try {
+          final AuthenticatorFactory instance = validConstructor.newInstance();
+          if (configuredFactoriesSet.remove(instance.getSimpleName().toUpperCase())) {
+            authFactories.put(instance.getSimpleName(), instance);
+          }
+        } catch (IllegalArgumentException | IllegalAccessException |
+            InstantiationException | InvocationTargetException e) {
+          throw new DrillbitStartupException(
+              String.format("Failed to create authentication factory of type '%s'",
+                  clazz.getCanonicalName()), e);
+        }
+      }
+    }
+
+    if (authFactories.size() == 0) {
+      throw new DrillbitStartupException("Authentication enabled, but no mechanism was configured correctly. " +
+          "Please check authentication configuration.");
+    }
+    logger.info("Configured authentication mechanisms: {}", authFactories.keySet());
+  }
+
+  @Override
+  public AuthenticatorFactory getAuthenticatorFactory(final String name) throws SaslException {
+    final AuthenticatorFactory mechanism = authFactories.get(name);
+    if (mechanism == null) {
+      throw new SaslException(String.format("Unknown mechanism: '%s' Configured mechanisms: %s",
+          name, authFactories.keySet()));
+    }
+    return mechanism;
+  }
+
+  @Override
+  public Set<String> getAllFactoryNames() {
+    return authFactories.keySet();
+  }
+
+  @Override
+  public boolean containsFactory(final String name) {
+    return authFactories.containsKey(name);
+  }
+
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(authFactories.values());
+    authFactories.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
new file mode 100644
index 0000000..bdcbcf5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+
+import javax.security.sasl.SaslException;
+import java.util.Map;
+import java.util.Set;
+
+public class ClientAuthenticatorProvider implements AuthenticatorProvider {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(ClientAuthenticatorProvider.class);
+
+  private static final String customFactories = System.getProperty("drill.customAuthFactories");
+
+  private static final class Holder {
+    static final ClientAuthenticatorProvider INSTANCE = new ClientAuthenticatorProvider();
+
+    // prevent instantiation
+    private Holder() {
+    }
+  }
+
+  public static ClientAuthenticatorProvider getInstance() {
+    return Holder.INSTANCE;
+  }
+
+  // Mapping: simple name -> authenticator factory
+  private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
+
+  public ClientAuthenticatorProvider() {
+    // factories provided by Drill
+    final KerberosFactory kerberosFactory = new KerberosFactory();
+    authFactories.put(kerberosFactory.getSimpleName(), kerberosFactory);
+    final PlainFactory plainFactory = new PlainFactory();
+    authFactories.put(plainFactory.getSimpleName(), plainFactory);
+
+    // then, custom factories
+    if (customFactories != null) {
+      try {
+        final String[] factories = customFactories.split(",");
+        for (final String factory : factories) {
+          final Class<?> clazz = Class.forName(factory);
+          if (AuthenticatorFactory.class.isAssignableFrom(clazz)) {
+            final AuthenticatorFactory instance = (AuthenticatorFactory) clazz.newInstance();
+            authFactories.put(instance.getSimpleName(), instance);
+          }
+        }
+      } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+        throw new DrillRuntimeException("Failed to create auth factory.", e);
+      }
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Configured mechanisms: {}", authFactories.keySet());
+    }
+  }
+
+  @Override
+  public AuthenticatorFactory getAuthenticatorFactory(final String name) throws SaslException {
+    final AuthenticatorFactory mechanism = authFactories.get(name);
+    if (mechanism == null) {
+      throw new SaslException(String.format("Unknown mechanism: '%s' Configured mechanisms: %s",
+          name, authFactories.keySet()));
+    }
+    return mechanism;
+  }
+
+  @Override
+  public Set<String> getAllFactoryNames() {
+    return authFactories.keySet();
+  }
+
+  @Override
+  public boolean containsFactory(final String name) {
+    return authFactories.containsKey(name);
+  }
+
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(authFactories.values());
+    authFactories.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
new file mode 100644
index 0000000..c8699b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link Sasl#createSaslClient} is known to be slow. This class caches available client factories.
+ */
+public class FastSaslClientFactory implements SaslClientFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FastSaslClientFactory.class);
+
+  // lazy initialization; all relevant providers should have registered with Security so that
+  // Sasl#getSaslClientFactories returns the latest possible list of SaslClient factories
+  private static final class Holder {
+    static final FastSaslClientFactory INSTANCE = new FastSaslClientFactory();
+
+    // prevent instantiation
+    private Holder() {
+    }
+  }
+
+  public static FastSaslClientFactory getInstance() {
+    return Holder.INSTANCE;
+  }
+
+  // package private
+  @VisibleForTesting
+  static void reload() {
+    getInstance().refresh();
+  }
+
+  // non-final for testing purposes
+  private ImmutableMap<String, List<SaslClientFactory>> clientFactories;
+
+  // prevent instantiation
+  private FastSaslClientFactory() {
+    refresh();
+  }
+
+  // used in initialization, and for testing
+  private void refresh() {
+    final Enumeration<SaslClientFactory> factories = Sasl.getSaslClientFactories();
+    final Map<String, List<SaslClientFactory>> map = Maps.newHashMap();
+
+    while (factories.hasMoreElements()) {
+      final SaslClientFactory factory = factories.nextElement();
+      // Passing null so factory is populated with all possibilities.  Properties passed when
+      // instantiating a client are what really matter. See createSaslClient.
+      for (final String mechanismName : factory.getMechanismNames(null)) {
+        if (!map.containsKey(mechanismName)) {
+          map.put(mechanismName, new ArrayList<SaslClientFactory>());
+        }
+        map.get(mechanismName).add(factory);
+      }
+    }
+
+    clientFactories = ImmutableMap.copyOf(map);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Registered sasl client factories: {}", clientFactories.keySet());
+    }
+  }
+
+  @Override
+  public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName,
+                                     Map<String, ?> props, CallbackHandler cbh) throws SaslException {
+    for (final String mechanism : mechanisms) {
+      final List<SaslClientFactory> factories = clientFactories.get(mechanism);
+      if (factories != null) {
+        for (final SaslClientFactory factory : factories) {
+          final SaslClient saslClient = factory.createSaslClient(new String[]{mechanism}, authorizationId, protocol,
+              serverName, props, cbh);
+          if (saslClient != null) {
+            return saslClient;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String[] getMechanismNames(final Map<String, ?> props) {
+    return clientFactories.keySet().toArray(new String[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
new file mode 100644
index 0000000..0fe15af
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link Sasl#createSaslServer} is known to be slow. This class caches available server factories.
+ * This is a modified version of Apache Hadoop's implementation.
+ */
+public final class FastSaslServerFactory implements SaslServerFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FastSaslServerFactory.class);
+
+  // lazy initialization; all relevant providers should have registered with Security so that
+  // Sasl#getSaslServerFactories returns the latest possible list of SaslServer factories
+  private static final class Holder {
+    static final FastSaslServerFactory INSTANCE = new FastSaslServerFactory();
+
+    // prevent instantiation
+    private Holder() {
+    }
+  }
+
+  public static FastSaslServerFactory getInstance() {
+    return Holder.INSTANCE;
+  }
+
+  // package private
+  @VisibleForTesting
+  static void reload() {
+    getInstance().refresh();
+  }
+
+  // non-final for testing purposes
+  private ImmutableMap<String, List<SaslServerFactory>> serverFactories;
+
+  // prevent instantiation
+  private FastSaslServerFactory() {
+    refresh();
+  }
+
+  // used in initialization, and for testing
+  private void refresh() {
+    final Enumeration<SaslServerFactory> factories = Sasl.getSaslServerFactories();
+    final Map<String, List<SaslServerFactory>> map = Maps.newHashMap();
+
+    while (factories.hasMoreElements()) {
+      final SaslServerFactory factory = factories.nextElement();
+      // Passing null so factory is populated with all possibilities.  Properties passed when
+      // instantiating a server are what really matter. See createSaslServer.
+      for (final String mechanismName : factory.getMechanismNames(null)) {
+        if (!map.containsKey(mechanismName)) {
+          map.put(mechanismName, new ArrayList<SaslServerFactory>());
+        }
+        map.get(mechanismName).add(factory);
+      }
+    }
+
+    serverFactories = ImmutableMap.copyOf(map);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Registered sasl server factories: {}", serverFactories.keySet());
+    }
+  }
+
+  @Override
+  public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
+                                     CallbackHandler cbh) throws SaslException {
+    final List<SaslServerFactory> factories = serverFactories.get(mechanism);
+    if (factories != null) {
+      for (final SaslServerFactory factory : factories) {
+        final SaslServer saslServer = factory.createSaslServer(mechanism, protocol, serverName, props, cbh);
+        if (saslServer != null) {
+          return saslServer;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String[] getMechanismNames(final Map<String, ?> props) {
+    return serverFactories.keySet().toArray(new String[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
new file mode 100644
index 0000000..bf34d57
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.ServerConnection;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the server-side.
+ *
+ * @param <S> Server connection type
+ * @param <T> RPC type
+ */
+public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extends EnumLite>
+    implements RequestHandler<S> {
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(ServerAuthenticationHandler.class);
+
+  private static final ImmutableMap<SaslStatus, SaslResponseProcessor> RESPONSE_PROCESSORS;
+
+  static {
+    final Map<SaslStatus, SaslResponseProcessor> map = new EnumMap<>(SaslStatus.class);
+    map.put(SaslStatus.SASL_START, new SaslStartProcessor());
+    map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+    map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+    map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+    RESPONSE_PROCESSORS = Maps.immutableEnumMap(map);
+  }
+
+  private final RequestHandler<S> requestHandler;
+  private final int saslRequestTypeValue;
+  private final T saslResponseType;
+
+  public ServerAuthenticationHandler(final RequestHandler<S> requestHandler, final int saslRequestTypeValue,
+                                     final T saslResponseType) {
+    this.requestHandler = requestHandler;
+    this.saslRequestTypeValue = saslRequestTypeValue;
+    this.saslResponseType = saslResponseType;
+  }
+
+  @Override
+  public void handle(S connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+      throws RpcException {
+    final String remoteAddress = connection.getRemoteAddress().toString();
+
+    // exchange involves server "challenges" and client "responses" (initiated by client)
+    if (saslRequestTypeValue == rpcType) {
+      final SaslMessage saslResponse;
+      try {
+        saslResponse = SaslMessage.PARSER.parseFrom(new ByteBufInputStream(pBody));
+      } catch (final InvalidProtocolBufferException e) {
+        handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+        return;
+      }
+
+      logger.trace("Received SASL message {} from {}", saslResponse.getStatus(), remoteAddress);
+      final SaslResponseProcessor processor = RESPONSE_PROCESSORS.get(saslResponse.getStatus());
+      if (processor == null) {
+        logger.info("Unknown message type from client from {}. Will stop authentication.", remoteAddress);
+        handleAuthFailure(remoteAddress, sender, new SaslException("Received unexpected message"),
+            saslResponseType);
+        return;
+      }
+
+      final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, remoteAddress,
+          sender, requestHandler, saslResponseType);
+      try {
+        processor.process(context);
+      } catch (final Exception e) {
+        handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+      }
+    } else {
+
+      // this handler only handles messages of SASL_MESSAGE_VALUE type
+
+      // the response type for this request type is likely known from UserRpcConfig,
+      // but the client should not be making any requests before authenticating.
+      // drop connection
+      throw new RpcException(
+          String.format("Request of type %d is not allowed without authentication. " +
+                  "Client on %s must authenticate before making requests. Connection dropped.",
+              rpcType, remoteAddress));
+    }
+  }
+
+  private static class SaslResponseContext<S extends ServerConnection<S>, T extends EnumLite> {
+
+    final SaslMessage saslResponse;
+    final S connection;
+    final String remoteAddress;
+    final ResponseSender sender;
+    final RequestHandler<S> requestHandler;
+    final T saslResponseType;
+
+    SaslResponseContext(SaslMessage saslResponse, S connection, String remoteAddress, ResponseSender sender,
+                        RequestHandler<S> requestHandler, T saslResponseType) {
+      this.saslResponse = checkNotNull(saslResponse);
+      this.connection = checkNotNull(connection);
+      this.remoteAddress = checkNotNull(remoteAddress);
+      this.sender = checkNotNull(sender);
+      this.requestHandler = checkNotNull(requestHandler);
+      this.saslResponseType = checkNotNull(saslResponseType);
+    }
+  }
+
+  private interface SaslResponseProcessor {
+
+    /**
+     * Process response from client, and if there are no exceptions, send response using
+     * {@link SaslResponseContext#sender}. Otherwise, throw the exception.
+     *
+     * @param context response context
+     */
+    <S extends ServerConnection<S>, T extends EnumLite>
+    void process(SaslResponseContext<S, T> context) throws Exception;
+
+  }
+
+  private static class SaslStartProcessor implements SaslResponseProcessor {
+
+    @Override
+    public <S extends ServerConnection<S>, T extends EnumLite>
+    void process(SaslResponseContext<S, T> context) throws Exception {
+      context.connection.initSaslServer(context.saslResponse.getMechanism());
+
+      // assume #evaluateResponse must be called at least once
+      RESPONSE_PROCESSORS.get(SaslStatus.SASL_IN_PROGRESS).process(context);
+    }
+  }
+
+  private static class SaslInProgressProcessor implements SaslResponseProcessor {
+
+    @Override
+    public <S extends ServerConnection<S>, T extends EnumLite>
+    void process(SaslResponseContext<S, T> context) throws Exception {
+      final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+      final SaslServer saslServer = context.connection.getSaslServer();
+
+      final byte[] challengeBytes = evaluateResponse(saslServer, context.saslResponse.getData().toByteArray());
+
+      if (saslServer.isComplete()) {
+        challenge.setStatus(SaslStatus.SASL_SUCCESS);
+        if (challengeBytes != null) {
+          challenge.setData(ByteString.copyFrom(challengeBytes));
+        }
+
+        handleSuccess(context, challenge, saslServer);
+      } else {
+        challenge.setStatus(SaslStatus.SASL_IN_PROGRESS)
+            .setData(ByteString.copyFrom(challengeBytes));
+        context.sender.send(new Response(context.saslResponseType, challenge.build()));
+      }
+    }
+  }
+
+  // only when client succeeds first
+  private static class SaslSuccessProcessor implements SaslResponseProcessor {
+
+    @Override
+    public <S extends ServerConnection<S>, T extends EnumLite>
+    void process(SaslResponseContext<S, T> context) throws Exception {
+      // at this point, #isComplete must be false; so try once, fail otherwise
+      final SaslServer saslServer = context.connection.getSaslServer();
+
+      evaluateResponse(saslServer, context.saslResponse.getData().toByteArray()); // discard challenge
+
+      if (saslServer.isComplete()) {
+        final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+        challenge.setStatus(SaslStatus.SASL_SUCCESS);
+
+        handleSuccess(context, challenge, saslServer);
+      } else {
+        logger.info("Failed to authenticate client from {}", context.remoteAddress);
+        throw new SaslException("Client allegedly succeeded authentication, but server did not. Suspicious?");
+      }
+    }
+  }
+
+  private static class SaslFailedProcessor implements SaslResponseProcessor {
+
+    @Override
+    public <S extends ServerConnection<S>, T extends EnumLite>
+    void process(SaslResponseContext<S, T> context) throws Exception {
+      logger.info("Client from {} failed authentication graciously, and does not want to continue.",
+          context.remoteAddress);
+      throw new SaslException("Client graciously failed authentication");
+    }
+  }
+
+  private static byte[] evaluateResponse(final SaslServer saslServer,
+                                         final byte[] responseBytes) throws SaslException {
+    try {
+      return UserGroupInformation.getLoginUser()
+          .doAs(new PrivilegedExceptionAction<byte[]>() {
+            @Override
+            public byte[] run() throws Exception {
+              return saslServer.evaluateResponse(responseBytes);
+            }
+          });
+    } catch (final UndeclaredThrowableException e) {
+      throw new SaslException(String.format("Unexpected failure trying to authenticate using %s",
+          saslServer.getMechanismName()), e.getCause());
+    } catch (final IOException | InterruptedException e) {
+      if (e instanceof SaslException) {
+        throw (SaslException) e;
+      } else {
+        throw new SaslException(String.format("Unexpected failure trying to authenticate using %s",
+            saslServer.getMechanismName()), e);
+      }
+    }
+  }
+
+  private static <S extends ServerConnection<S>, T extends EnumLite>
+  void handleSuccess(final SaslResponseContext<S, T> context, final SaslMessage.Builder challenge,
+                     final SaslServer saslServer) throws IOException {
+    context.connection.changeHandlerTo(context.requestHandler);
+    context.connection.finalizeSaslSession();
+    context.sender.send(new Response(context.saslResponseType, challenge.build()));
+
+    // setup security layers here..
+
+    if (logger.isTraceEnabled()) {
+      logger.trace("Authenticated {} successfully using {} from {}", saslServer.getAuthorizationID(),
+          saslServer.getMechanismName(), context.remoteAddress);
+    }
+  }
+
+  private static final SaslMessage SASL_FAILED_MESSAGE =
+      SaslMessage.newBuilder().setStatus(SaslStatus.SASL_FAILED).build();
+
+  private static <T extends EnumLite>
+  void handleAuthFailure(final String remoteAddress, final ResponseSender sender,
+                         final Exception e, final T saslResponseType) throws RpcException {
+    logger.debug("Authentication failed from client {} due to {}", remoteAddress, e);
+
+    // inform the client that authentication failed, and no more
+    sender.send(new Response(saslResponseType, SASL_FAILED_MESSAGE));
+
+    // drop connection
+    throw new RpcException(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
new file mode 100644
index 0000000..855dd8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.kerberos;
+
+import org.apache.drill.common.KerberosUtil;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.FastSaslClientFactory;
+import org.apache.drill.exec.rpc.security.FastSaslServerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+public class KerberosFactory implements AuthenticatorFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KerberosFactory.class);
+
+  private static final String DRILL_SERVICE_NAME = System.getProperty("drill.principal.primary", "drill");
+
+  @Override
+  public String getSimpleName() {
+    return KerberosUtil.KERBEROS_SIMPLE_NAME;
+  }
+
+  @Override
+  public UserGroupInformation createAndLoginUser(final Map<String, ?> properties) throws IOException {
+    final Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+        UserGroupInformation.AuthenticationMethod.KERBEROS.toString());
+    UserGroupInformation.setConfiguration(conf);
+
+    final String keytab = (String) properties.get(DrillProperties.KEYTAB);
+    final boolean assumeSubject = properties.containsKey(DrillProperties.KERBEROS_FROM_SUBJECT) &&
+        Boolean.parseBoolean((String) properties.get(DrillProperties.KERBEROS_FROM_SUBJECT));
+    try {
+      final UserGroupInformation ugi;
+      if (assumeSubject) {
+        ugi = UserGroupInformation.getUGIFromSubject(Subject.getSubject(AccessController.getContext()));
+        logger.debug("Assuming subject for {}.", ugi.getShortUserName());
+      } else {
+        if (keytab != null) {
+          ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+              (String) properties.get(DrillProperties.USER), keytab);
+          logger.debug("Logged in {} using keytab.", ugi.getShortUserName());
+        } else {
+          // includes Kerberos ticket login
+          ugi = UserGroupInformation.getCurrentUser();
+          logger.debug("Logged in {} using ticket.", ugi.getShortUserName());
+        }
+      }
+      return ugi;
+    } catch (final IOException e) {
+      logger.debug("Login failed.", e);
+      final Throwable cause = e.getCause();
+      if (cause instanceof LoginException) {
+        throw new SaslException("Failed to login.", cause);
+      }
+      throw new SaslException("Unexpected failure trying to login.", cause);
+    }
+  }
+
+  @Override
+  public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
+      throws SaslException {
+    try {
+      final String primaryName = ugi.getShortUserName();
+      final String instanceName = new HadoopKerberosName(ugi.getUserName()).getHostName();
+
+      final SaslServer saslServer = ugi.doAs(new PrivilegedExceptionAction<SaslServer>() {
+        @Override
+        public SaslServer run() throws Exception {
+          return FastSaslServerFactory.getInstance()
+              .createSaslServer(KerberosUtil.KERBEROS_SASL_NAME, primaryName, instanceName, properties,
+                  new KerberosServerCallbackHandler());
+        }
+      });
+      logger.trace("GSSAPI SaslServer created.");
+      return saslServer;
+    } catch (final UndeclaredThrowableException e) {
+      final Throwable cause = e.getCause();
+      logger.debug("Authentication failed.", cause);
+      if (cause instanceof SaslException) {
+        throw (SaslException) cause;
+      } else {
+        throw new SaslException("Unexpected failure trying to authenticate using Kerberos", cause);
+      }
+    } catch (final IOException | InterruptedException e) {
+      logger.debug("Authentication failed.", e);
+      throw new SaslException("Unexpected failure trying to authenticate using Kerberos", e);
+    }
+  }
+
+  @Override
+  public SaslClient createSaslClient(final UserGroupInformation ugi, final Map<String, ?> properties)
+      throws SaslException {
+    final String servicePrincipal = getServicePrincipal(properties);
+
+    final String parts[] = KerberosUtil.splitPrincipalIntoParts(servicePrincipal);
+    final String serviceName = parts[0];
+    final String serviceHostName = parts[1];
+    // ignore parts[2]; GSSAPI gets the realm info from the ticket
+    try {
+      final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
+
+        @Override
+        public SaslClient run() throws Exception {
+          return FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},
+              null /** authorization ID */, serviceName, serviceHostName, properties,
+              new CallbackHandler() {
+                @Override
+                public void handle(final Callback[] callbacks)
+                    throws IOException, UnsupportedCallbackException {
+                  throw new UnsupportedCallbackException(callbacks[0]);
+                }
+              });
+        }
+      });
+      logger.debug("GSSAPI SaslClient created to authenticate to {} running on {}",
+          serviceName, serviceHostName);
+      return saslClient;
+    } catch (final UndeclaredThrowableException e) {
+      logger.debug("Authentication failed.", e);
+      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
+          serviceHostName), e.getCause());
+    } catch (final IOException | InterruptedException e) {
+      logger.debug("Authentication failed.", e);
+      if (e instanceof SaslException) {
+        throw (SaslException) e;
+      }
+      throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
+          serviceHostName), e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    // no-op
+  }
+
+  private static class KerberosServerCallbackHandler implements CallbackHandler {
+
+    @Override
+    public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      for (final Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          final AuthorizeCallback authorizeCallback = (AuthorizeCallback) callback;
+          if (!authorizeCallback.getAuthenticationID()
+              .equals(authorizeCallback.getAuthorizationID())) {
+            throw new SaslException("Drill expects authorization ID and authentication ID to match. " +
+                "Use inbound impersonation feature so one entity can act on behalf of another.");
+          } else {
+            authorizeCallback.setAuthorized(true);
+          }
+        } else {
+          throw new UnsupportedCallbackException(callback);
+        }
+      }
+    }
+  }
+
+  private static String getServicePrincipal(final Map<String, ?> properties) throws SaslException {
+    final String principal = (String) properties.get(DrillProperties.SERVICE_PRINCIPAL);
+    if (principal != null) {
+      return principal;
+    }
+
+    final String serviceHostname = (String) properties.get(DrillProperties.SERVICE_HOST);
+    if (serviceHostname == null) {
+      throw new SaslException("Unknown Drillbit hostname. Check connection parameters?");
+    }
+
+    final String serviceName = (String) properties.get(DrillProperties.SERVICE_NAME);
+    final String realm = (String) properties.get(DrillProperties.REALM);
+    try {
+      return KerberosUtil.getPrincipalFromParts(
+          serviceName == null ? DRILL_SERVICE_NAME : serviceName,
+          serviceHostname.toLowerCase(), // see HADOOP-7988
+          realm == null ? KerberosUtil.getDefaultRealm() : realm
+      );
+    } catch (final ClassNotFoundException | NoSuchMethodException |
+        IllegalAccessException | InvocationTargetException e) {
+      throw new SaslException("Could not resolve realm information. Please set explicitly in connection parameters.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
new file mode 100644
index 0000000..5c6eff3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Communication security.
+ * <p>
+ * Drill uses Java's SASL library to authenticate clients (users and other bits). This is achieved using
+ * {@link org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener} on the client-side, and
+ * {@link org.apache.drill.exec.rpc.security.ServerAuthenticationHandler} on the server-side.
+ * <p>
+ * If authentication is enabled, {@link org.apache.drill.exec.rpc.security.AuthenticatorFactory authenticator factory}
+ * implementations are discovered at startup from {@link org.apache.drill.common.scanner.persistence.ScanResult
+ * scan result} using {@link org.apache.drill.exec.rpc.security.AuthenticatorProviderImpl}. At connection time, after
+ * handshake, if either side requires authentication, a series of SASL messages are exchanged. Without successful
+ * authentication, any subsequent messages will result in failure and connection drop.
+ * <p>
+ * Out of the box, Drill supports {@link org.apache.drill.exec.rpc.security.kerberos.KerberosFactory KERBEROS}
+ * (through GSSAPI) and {@link org.apache.drill.exec.rpc.security.plain.PlainFactory PLAIN} (through
+ * {@link org.apache.drill.exec.rpc.user.security.UserAuthenticator}) mechanisms.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/DRILL-4280">
+ * DRILL-4280 (design and configuration)</a>
+ * @see <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/sasl/package-summary.html">
+ * Java's SASL Library</a>
+ */
+package org.apache.drill.exec.rpc.security;

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
new file mode 100644
index 0000000..4a0db95
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.plain;
+
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.FastSaslClientFactory;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+public class PlainFactory implements AuthenticatorFactory {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlainFactory.class);
+
+  public static final String SIMPLE_NAME = PlainServer.MECHANISM_NAME;
+
+  private final UserAuthenticator authenticator;
+
+  public PlainFactory() {
+    this.authenticator = null;
+  }
+
+  public PlainFactory(final UserAuthenticator authenticator) {
+    this.authenticator = authenticator;
+  }
+
+  @Override
+  public String getSimpleName() {
+    return SIMPLE_NAME;
+  }
+
+  @Override
+  public UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException {
+    final Configuration conf = new Configuration();
+    UserGroupInformation.setConfiguration(conf);
+    try {
+      return UserGroupInformation.getCurrentUser();
+    } catch (final IOException e) {
+      logger.debug("Login failed.", e);
+      final Throwable cause = e.getCause();
+      if (cause instanceof LoginException) {
+        throw new SaslException("Failed to login.", cause);
+      }
+      throw new SaslException("Unexpected failure trying to login. ", cause);
+    }
+  }
+
+  @Override
+  public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
+      throws SaslException {
+    return new PlainServer(authenticator, properties);
+  }
+
+  @Override
+  public SaslClient createSaslClient(final UserGroupInformation ugi, final Map<String, ?> properties)
+      throws SaslException {
+    final String userName = (String) properties.get(DrillProperties.USER);
+    final String password = (String) properties.get(DrillProperties.PASSWORD);
+
+    return FastSaslClientFactory.getInstance().createSaslClient(new String[]{SIMPLE_NAME},
+        null /** authorization ID */, null, null, properties, new CallbackHandler() {
+          @Override
+          public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+            for (final Callback callback : callbacks) {
+              if (callback instanceof NameCallback) {
+                NameCallback.class.cast(callback).setName(userName);
+                continue;
+              }
+              if (callback instanceof PasswordCallback) {
+                PasswordCallback.class.cast(callback).setPassword(password.toCharArray());
+                continue;
+              }
+              throw new UnsupportedCallbackException(callback);
+            }
+          }
+        });
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (authenticator != null) {
+      authenticator.close();
+    }
+  }
+
+  // used for clients < 1.10
+  public UserAuthenticator getAuthenticator() {
+    return authenticator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
new file mode 100644
index 0000000..417fca1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.plain;
+
+import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Plain SaslServer implementation.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4616">RFC for PLAIN SASL mechanism</a>
+ */
+class PlainServer implements SaslServer {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlainServer.class);
+
+  private static final String UTF_8_NULL = "\u0000";
+
+  public static final String MECHANISM_NAME = "PLAIN";
+
+  private final UserAuthenticator authenticator;
+
+  private boolean completed = false;
+  private String authorizationID;
+
+  PlainServer(final UserAuthenticator authenticator, final Map<String, ?> properties) throws SaslException {
+    if (properties != null) {
+      if ("true".equalsIgnoreCase((String) properties.get(Sasl.POLICY_NOPLAINTEXT))) {
+        throw new SaslException("PLAIN authentication is not permitted.");
+      }
+    }
+    this.authenticator = authenticator;
+  }
+
+  @Override
+  public String getMechanismName() {
+    return MECHANISM_NAME;
+  }
+
+  @Override
+  public byte[] evaluateResponse(byte[] response) throws SaslException {
+    if (completed) {
+      throw new IllegalStateException("PLAIN authentication already completed");
+    }
+
+    if (response == null) {
+      throw new SaslException("Received null response");
+    }
+
+    final String payload = new String(response, StandardCharsets.UTF_8);
+
+    // Separator defined in PlainClient is 0
+    // three parts: [ authorizationID, authenticationID, password ]
+    final String[] parts = payload.split(UTF_8_NULL, 3);
+    if (parts.length != 3) {
+      throw new SaslException("Received corrupt response. Expected 3 parts, but received "
+          + parts.length);
+    }
+    String authorizationID = parts[0];
+    final String authenticationID = parts[1];
+    final String password = parts[2];
+
+    if (authorizationID.isEmpty()) {
+      authorizationID = authenticationID;
+    }
+
+    try {
+      authenticator.authenticate(authenticationID, password);
+    } catch (final UserAuthenticationException e) {
+      throw new SaslException(e.getMessage());
+    }
+
+    if (!authorizationID.equals(authenticationID)) {
+      throw new SaslException("Drill expects authorization ID and authentication ID to match. " +
+          "Use inbound impersonation feature so one entity can act on behalf of another.");
+    }
+
+    this.authorizationID = authorizationID;
+    completed = true;
+    return null;
+  }
+
+  @Override
+  public boolean isComplete() {
+    return completed;
+  }
+
+  @Override
+  public String getAuthorizationID() {
+    if (completed) {
+      return authorizationID;
+    }
+    throw new IllegalStateException("PLAIN authentication not completed");
+  }
+
+  @Override
+  public Object getNegotiatedProperty(String propName) {
+    if (completed) {
+      return Sasl.QOP.equals(propName) ? "auth" : null;
+    }
+    throw new IllegalStateException("PLAIN authentication not completed");
+  }
+
+  @Override
+  public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    if (completed) {
+      throw new SaslException("PLAIN supports neither integrity nor privacy");
+    } else {
+      throw new IllegalStateException("PLAIN authentication not completed");
+    }
+  }
+
+  @Override
+  public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    if (completed) {
+      throw new SaslException("PLAIN supports neither integrity nor privacy");
+    } else {
+      throw new IllegalStateException("PLAIN authentication not completed");
+    }
+  }
+
+  @Override
+  public void dispose() throws SaslException {
+    authorizationID = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 0b45f52..ecf6f6a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -26,6 +26,7 @@ drill {
       org.apache.drill.exec.physical.impl.BatchCreator,
       org.apache.drill.exec.physical.impl.RootCreator,
       org.apache.drill.exec.rpc.user.security.UserAuthenticator,
+      org.apache.drill.exec.rpc.security.AuthenticatorFactory,
       org.apache.drill.exec.store.dfs.FormatPlugin,
       org.apache.drill.exec.store.StoragePlugin
     ],
@@ -36,7 +37,8 @@ drill {
       org.apache.drill.exec.expr,
       org.apache.drill.exec.physical,
       org.apache.drill.exec.store,
-      org.apache.drill.exec.rpc.user.security
+      org.apache.drill.exec.rpc.user.security,
+      org.apache.drill.exec.rpc.security
     ]
   }
 }
@@ -144,10 +146,11 @@ drill.exec: {
     max_chained_user_hops: 3
   },
   security.user.auth {
-    enabled: false,
-    impl: "pam",
-    pam_profiles: [ "sudo", "login" ]
+    enabled: false
   },
+  security.bit.auth {
+    enabled : false
+  }
   trace: {
     directory: "/tmp/drill-trace",
     filesystem: "file:///"