You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/02/25 07:18:14 UTC
[21/29] drill git commit: DRILL-4280: CORE (security package)
DRILL-4280: CORE (security package)
+ Add AuthenticatorFactory interface
+ Kerberos implementation
+ includes SaslServer and SaslClient wrappers
+ Plain implementation
+ PlainServer implements SaslServer (unavailable in Java)
for username/password based authentication
+ retrofit user authenticator
+ add logic for backward compatibility
+ Add AuthenticatorProvider interface to provide authenticator
factories, and add two implementations:
+ DrillConfig and ScanResult based AuthenticatorProviderImpl
+ Default and system property based ClientAuthenticatorProvider
+ FastSaslServerFactory caches SaslServer factories
+ FastSaslClientFactory caches SaslClient factories
+ ServerAuthenticationHandler handles authentication on server-side
+ FailingRequestHandler to fail any message received
+ AuthenticationOutcomeListener handles authentication on client-side
security
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8a732c08
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8a732c08
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8a732c08
Branch: refs/heads/master
Commit: 8a732c0827cb0bb629718622de60ce8fcf70c571
Parents: f445b08
Author: Sudheesh Katkam <su...@apache.org>
Authored: Wed Jan 25 18:57:35 2017 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Feb 24 19:01:42 2017 -0800
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 3 +
.../drill/exec/rpc/FailingRequestHandler.java | 39 +++
.../drill/exec/rpc/security/AuthStringUtil.java | 60 ++++
.../security/AuthenticationOutcomeListener.java | 246 ++++++++++++++++
.../exec/rpc/security/AuthenticatorFactory.java | 80 ++++++
.../rpc/security/AuthenticatorProvider.java | 33 +++
.../rpc/security/AuthenticatorProviderImpl.java | 141 ++++++++++
.../security/ClientAuthenticatorProvider.java | 104 +++++++
.../rpc/security/FastSaslClientFactory.java | 113 ++++++++
.../rpc/security/FastSaslServerFactory.java | 111 ++++++++
.../security/ServerAuthenticationHandler.java | 280 +++++++++++++++++++
.../rpc/security/kerberos/KerberosFactory.java | 216 ++++++++++++++
.../drill/exec/rpc/security/package-info.java | 41 +++
.../exec/rpc/security/plain/PlainFactory.java | 117 ++++++++
.../exec/rpc/security/plain/PlainServer.java | 146 ++++++++++
.../src/main/resources/drill-module.conf | 11 +-
16 files changed, 1737 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 846cd8b..b8f0c23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -108,9 +108,12 @@ public interface ExecConstants {
String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
+ String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";
String USER_AUTHENTICATION_ENABLED = "drill.exec.security.user.auth.enabled";
String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
+ String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
+ String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
/** Size of JDBC batch queue (in batches) above which throttling begins. */
String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
new file mode 100644
index 0000000..13733ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FailingRequestHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * This handler fails any request on the connection. Example use case: the peer is making requests
+ * before authenticating.
+ *
+ * @param <S> server connection type
+ */
+public class FailingRequestHandler<S extends ServerConnection<S>> implements RequestHandler<S> {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FailingRequestHandler.class);
+
+ @Override
+ public void handle(S connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+ throws RpcException {
+
+ // drops connection
+ throw new RpcException(String.format("Request of type %d is not yet allowed. Dropping connection to %s.",
+ rpcType, connection.getRemoteAddress()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
new file mode 100644
index 0000000..01740a5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthStringUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class AuthStringUtil {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AuthStringUtil.class);
+
+ // ignores case
+ public static boolean listContains(final List<String> list, final String toCompare) {
+ for (final String string : list) {
+ if (string.equalsIgnoreCase(toCompare)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // converts list if strings to set of uppercase strings
+ public static Set<String> asSet(final List<String> list) {
+ if (list == null) {
+ return Sets.newHashSet();
+ }
+ return Sets.newHashSet(Iterators.transform(list.iterator(),
+ new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable String input) {
+ return input == null ? null : input.toUpperCase();
+ }
+ }));
+ }
+
+ // prevent instantiation
+ private AuthStringUtil() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
new file mode 100644
index 0000000..9c74ddc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the client-side.
+ *
+ * @param <T> handshake rpc type
+ * @param <C> Client connection type
+ * @param <HS> Handshake send type
+ * @param <HR> Handshake receive type
+ */
+public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientConnection,
+ HS extends MessageLite, HR extends MessageLite>
+ implements RpcOutcomeListener<SaslMessage> {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
+
+ private static final ImmutableMap<SaslStatus, SaslChallengeProcessor> CHALLENGE_PROCESSORS;
+ static {
+ final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
+ map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+ map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+ map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+ CHALLENGE_PROCESSORS = Maps.immutableEnumMap(map);
+ }
+
+ private final BasicClient<T, C, HS, HR> client;
+ private final C connection;
+ private final T saslRpcType;
+ private final UserGroupInformation ugi;
+ private final RpcOutcomeListener<?> completionListener;
+
+ public AuthenticationOutcomeListener(BasicClient<T, C, HS, HR> client,
+ C connection, T saslRpcType, UserGroupInformation ugi,
+ RpcOutcomeListener<?> completionListener) {
+ this.client = client;
+ this.connection = connection;
+ this.saslRpcType = saslRpcType;
+ this.ugi = ugi;
+ this.completionListener = completionListener;
+ }
+
+ public void initiate(final String mechanismName) {
+ logger.trace("Initiating SASL exchange.");
+ try {
+ final ByteString responseData;
+ final SaslClient saslClient = connection.getSaslClient();
+ if (saslClient.hasInitialResponse()) {
+ responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0]));
+ } else {
+ responseData = ByteString.EMPTY;
+ }
+ client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+ connection,
+ saslRpcType,
+ SaslMessage.newBuilder()
+ .setMechanism(mechanismName)
+ .setStatus(SaslStatus.SASL_START)
+ .setData(responseData)
+ .build(),
+ SaslMessage.class,
+ true /** the connection will not be backed up at this point */);
+ logger.trace("Initiated SASL exchange.");
+ } catch (final Exception e) {
+ completionListener.failed(RpcException.mapException(e));
+ }
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ completionListener.failed(RpcException.mapException(ex));
+ }
+
+ @Override
+ public void success(SaslMessage value, ByteBuf buffer) {
+ logger.trace("Server responded with message of type: {}", value.getStatus());
+ final SaslChallengeProcessor processor = CHALLENGE_PROCESSORS.get(value.getStatus());
+ if (processor == null) {
+ completionListener.failed(RpcException.mapException(
+ new SaslException("Server sent a corrupt message.")));
+ } else {
+ try {
+ final SaslChallengeContext context = new SaslChallengeContext(value, connection.getSaslClient(), ugi);
+
+ final SaslMessage saslResponse = processor.process(context);
+
+ if (saslResponse != null) {
+ client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+ connection, saslRpcType, saslResponse, SaslMessage.class,
+ true /** the connection will not be backed up at this point */);
+ } else {
+ // success
+ completionListener.success(null, null);
+ }
+ } catch (final Exception e) {
+ completionListener.failed(RpcException.mapException(e));
+ }
+ }
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ completionListener.interrupted(e);
+ }
+
+ private static class SaslChallengeContext {
+
+ final SaslMessage challenge;
+ final SaslClient saslClient;
+ final UserGroupInformation ugi;
+
+ SaslChallengeContext(SaslMessage challenge, SaslClient saslClient, UserGroupInformation ugi) {
+ this.challenge = checkNotNull(challenge);
+ this.saslClient = checkNotNull(saslClient);
+ this.ugi = checkNotNull(ugi);
+ }
+ }
+
+ private interface SaslChallengeProcessor {
+
+ /**
+ * Process challenge from server, and return a response.
+ *
+ * Returns null iff SASL exchange is complete and successful.
+ *
+ * @param context challenge context
+ * @return response
+ * @throws Exception
+ */
+ SaslMessage process(SaslChallengeContext context) throws Exception;
+
+ }
+
+ private static class SaslInProgressProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public SaslMessage process(SaslChallengeContext context) throws Exception {
+ final SaslMessage.Builder response = SaslMessage.newBuilder();
+
+ final byte[] responseBytes = evaluateChallenge(context.ugi, context.saslClient,
+ context.challenge.getData().toByteArray());
+
+ final boolean isComplete = context.saslClient.isComplete();
+ logger.trace("Evaluated challenge. Completed? {}.", isComplete);
+ response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
+ // if isComplete, the client will get one more response from server
+ response.setStatus(isComplete ? SaslStatus.SASL_SUCCESS : SaslStatus.SASL_IN_PROGRESS);
+ return response.build();
+ }
+ }
+
+ private static class SaslSuccessProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public SaslMessage process(SaslChallengeContext context) throws Exception {
+ if (context.saslClient.isComplete()) {
+ logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
+ // setup security layers here..
+ return null;
+ } else {
+
+ // server completed before client; so try once, fail otherwise
+ evaluateChallenge(context.ugi, context.saslClient,
+ context.challenge.getData().toByteArray()); // discard response
+
+ if (context.saslClient.isComplete()) {
+ logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
+ // setup security layers here..
+ return null;
+ } else {
+ throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
+ }
+ }
+ }
+ }
+
+ private static class SaslFailedProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public SaslMessage process(SaslChallengeContext context) throws Exception {
+ throw new SaslException("Authentication failed. Incorrect credentials?");
+ }
+ }
+
+ private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient,
+ final byte[] challengeBytes) throws SaslException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+ @Override
+ public byte[] run() throws Exception {
+ return saslClient.evaluateChallenge(challengeBytes);
+ }
+ });
+ } catch (final UndeclaredThrowableException e) {
+ throw new SaslException(
+ String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e.getCause());
+ } catch (final IOException | InterruptedException e) {
+ if (e instanceof SaslException) {
+ throw (SaslException) e;
+ } else {
+ throw new SaslException(
+ String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
new file mode 100644
index 0000000..307ae97
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An implementation of this factory will be initialized once at startup, if the authenticator is enabled
+ * (see {@link #getSimpleName}). For every request for this mechanism (i.e. after establishing a connection),
+ * {@link #createSaslServer} will be invoked on the server-side and {@link #createSaslClient} will be invoked
+ * on the client-side.
+ *
+ * Note:
+ * + Custom authenticators must have a default constructor.
+ *
+ * Examples: PlainFactory and KerberosFactory.
+ */
+public interface AuthenticatorFactory extends AutoCloseable {
+
+ /**
+ * Name of the mechanism, in upper case.
+ *
+ * If this mechanism is present in the list of enabled mechanisms, an instance of this factory is loaded. Note
+ * that the simple name maybe the same as it's SASL name.
+ *
+ * @return mechanism name
+ */
+ String getSimpleName();
+
+ /**
+ * Create and get the login user based on the given properties.
+ *
+ * @param properties config properties
+ * @return ugi
+ * @throws IOException
+ */
+ UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException;
+
+ /**
+ * The caller is responsible for {@link SaslServer#dispose disposing} the returned SaslServer.
+ *
+ * @param ugi ugi
+ * @param properties config properties
+ * @return sasl server
+ * @throws SaslException
+ */
+ SaslServer createSaslServer(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+ /**
+ * The caller is responsible for {@link SaslClient#dispose disposing} the returned SaslClient.
+ *
+ * @param ugi ugi
+ * @param properties config properties
+ * @return sasl client
+ * @throws SaslException
+ */
+ SaslClient createSaslClient(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
new file mode 100644
index 0000000..66ed98f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.SaslException;
+import java.util.Set;
+
+public interface AuthenticatorProvider extends AutoCloseable {
+
+ AuthenticatorFactory getAuthenticatorFactory(String name) throws SaslException;
+
+ Set<String> getAllFactoryNames();
+
+ boolean containsFactory(String name);
+
+ @Override
+ void close() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
new file mode 100644
index 0000000..f4c60e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticatorFactory;
+
+import javax.security.sasl.SaslException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AuthenticatorProviderImpl implements AuthenticatorProvider {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(AuthenticatorProviderImpl.class);
+
+ // Mapping: simple name -> authenticator factory
+ private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
+
+ @SuppressWarnings("unchecked")
+ public AuthenticatorProviderImpl(final DrillConfig config, final ScanResult scan) throws DrillbitStartupException {
+ List<String> configuredFactories = Lists.newArrayList();
+ if (config.hasPath(ExecConstants.AUTHENTICATION_MECHANISMS)) {
+ configuredFactories = config.getStringList(ExecConstants.AUTHENTICATION_MECHANISMS);
+ }
+
+ final Set<String> configuredFactoriesSet = AuthStringUtil.asSet(configuredFactories);
+ // to ensure backward compatibility of PLAIN config
+ if (config.hasPath(ExecConstants.USER_AUTHENTICATOR_IMPL)) {
+ configuredFactoriesSet.add(PlainFactory.SIMPLE_NAME);
+ }
+ if (configuredFactoriesSet.isEmpty()) {
+ return;
+ }
+
+ logger.debug("Configuring authenticator factories: {}", configuredFactories);
+ // PLAIN mechanism need special handling due to UserAuthenticator
+ if (configuredFactoriesSet.remove(PlainFactory.SIMPLE_NAME)) {
+ // instantiated here, but closed in PlainFactory#close
+ final UserAuthenticator userAuthenticator = UserAuthenticatorFactory.createAuthenticator(config, scan);
+ final PlainFactory factory = new PlainFactory(userAuthenticator);
+ authFactories.put(PlainFactory.SIMPLE_NAME, factory);
+ logger.trace("Plain mechanism enabled.");
+ }
+
+ // Then, load other authentication factories, if any
+ if (!configuredFactoriesSet.isEmpty()) {
+ final Collection<Class<? extends AuthenticatorFactory>> factoryImpls =
+ scan.getImplementations(AuthenticatorFactory.class);
+ logger.debug("Found AuthenticatorFactory implementations: {}", factoryImpls);
+
+ for (final Class<? extends AuthenticatorFactory> clazz : factoryImpls) {
+ Constructor<? extends AuthenticatorFactory> validConstructor = null;
+ for (final Constructor<?> c : clazz.getConstructors()) {
+ final Class<?>[] params = c.getParameterTypes();
+ if (params.length == 0) {
+ validConstructor = (Constructor<? extends AuthenticatorFactory>) c; // unchecked
+ break;
+ }
+ }
+
+ if (validConstructor == null) {
+ logger.warn("Skipping authentication factory class {}. It must implement at least one constructor " +
+ "with signature [{}()]", clazz.getCanonicalName(), clazz.getName());
+ continue;
+ }
+
+ try {
+ final AuthenticatorFactory instance = validConstructor.newInstance();
+ if (configuredFactoriesSet.remove(instance.getSimpleName().toUpperCase())) {
+ authFactories.put(instance.getSimpleName(), instance);
+ }
+ } catch (IllegalArgumentException | IllegalAccessException |
+ InstantiationException | InvocationTargetException e) {
+ throw new DrillbitStartupException(
+ String.format("Failed to create authentication factory of type '%s'",
+ clazz.getCanonicalName()), e);
+ }
+ }
+ }
+
+ if (authFactories.size() == 0) {
+ throw new DrillbitStartupException("Authentication enabled, but no mechanism was configured correctly. " +
+ "Please check authentication configuration.");
+ }
+ logger.info("Configured authentication mechanisms: {}", authFactories.keySet());
+ }
+
+ @Override
+ public AuthenticatorFactory getAuthenticatorFactory(final String name) throws SaslException {
+ final AuthenticatorFactory mechanism = authFactories.get(name);
+ if (mechanism == null) {
+ throw new SaslException(String.format("Unknown mechanism: '%s' Configured mechanisms: %s",
+ name, authFactories.keySet()));
+ }
+ return mechanism;
+ }
+
+ @Override
+ public Set<String> getAllFactoryNames() {
+ return authFactories.keySet();
+ }
+
+ @Override
+ public boolean containsFactory(final String name) {
+ return authFactories.containsKey(name);
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(authFactories.values());
+ authFactories.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
new file mode 100644
index 0000000..bdcbcf5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.rpc.security.kerberos.KerberosFactory;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+
+import javax.security.sasl.SaslException;
+import java.util.Map;
+import java.util.Set;
+
+public class ClientAuthenticatorProvider implements AuthenticatorProvider {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(ClientAuthenticatorProvider.class);
+
+ private static final String customFactories = System.getProperty("drill.customAuthFactories");
+
+ private static final class Holder {
+ static final ClientAuthenticatorProvider INSTANCE = new ClientAuthenticatorProvider();
+
+ // prevent instantiation
+ private Holder() {
+ }
+ }
+
+ public static ClientAuthenticatorProvider getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ // Mapping: simple name -> authenticator factory
+ private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
+
+ public ClientAuthenticatorProvider() {
+ // factories provided by Drill
+ final KerberosFactory kerberosFactory = new KerberosFactory();
+ authFactories.put(kerberosFactory.getSimpleName(), kerberosFactory);
+ final PlainFactory plainFactory = new PlainFactory();
+ authFactories.put(plainFactory.getSimpleName(), plainFactory);
+
+ // then, custom factories
+ if (customFactories != null) {
+ try {
+ final String[] factories = customFactories.split(",");
+ for (final String factory : factories) {
+ final Class<?> clazz = Class.forName(factory);
+ if (AuthenticatorFactory.class.isAssignableFrom(clazz)) {
+ final AuthenticatorFactory instance = (AuthenticatorFactory) clazz.newInstance();
+ authFactories.put(instance.getSimpleName(), instance);
+ }
+ }
+ } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new DrillRuntimeException("Failed to create auth factory.", e);
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Configured mechanisms: {}", authFactories.keySet());
+ }
+ }
+
+ @Override
+ public AuthenticatorFactory getAuthenticatorFactory(final String name) throws SaslException {
+ final AuthenticatorFactory mechanism = authFactories.get(name);
+ if (mechanism == null) {
+ throw new SaslException(String.format("Unknown mechanism: '%s' Configured mechanisms: %s",
+ name, authFactories.keySet()));
+ }
+ return mechanism;
+ }
+
+ @Override
+ public Set<String> getAllFactoryNames() {
+ return authFactories.keySet();
+ }
+
+ @Override
+ public boolean containsFactory(final String name) {
+ return authFactories.containsKey(name);
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(authFactories.values());
+ authFactories.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
new file mode 100644
index 0000000..c8699b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslClientFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link Sasl#createSaslClient} is known to be slow. This class caches available client factories.
+ */
+public class FastSaslClientFactory implements SaslClientFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FastSaslClientFactory.class);
+
+ // lazy initialization; all relevant providers should have registered with Security so that
+ // Sasl#getSaslClientFactories returns the latest possible list of SaslClient factories
+ private static final class Holder {
+ static final FastSaslClientFactory INSTANCE = new FastSaslClientFactory();
+
+ // prevent instantiation
+ private Holder() {
+ }
+ }
+
+ public static FastSaslClientFactory getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ // package private
+ @VisibleForTesting
+ static void reload() {
+ getInstance().refresh();
+ }
+
+ // non-final for testing purposes
+ private ImmutableMap<String, List<SaslClientFactory>> clientFactories;
+
+ // prevent instantiation
+ private FastSaslClientFactory() {
+ refresh();
+ }
+
+ // used in initialization, and for testing
+ private void refresh() {
+ final Enumeration<SaslClientFactory> factories = Sasl.getSaslClientFactories();
+ final Map<String, List<SaslClientFactory>> map = Maps.newHashMap();
+
+ while (factories.hasMoreElements()) {
+ final SaslClientFactory factory = factories.nextElement();
+ // Passing null so factory is populated with all possibilities. Properties passed when
+ // instantiating a client are what really matter. See createSaslClient.
+ for (final String mechanismName : factory.getMechanismNames(null)) {
+ if (!map.containsKey(mechanismName)) {
+ map.put(mechanismName, new ArrayList<SaslClientFactory>());
+ }
+ map.get(mechanismName).add(factory);
+ }
+ }
+
+ clientFactories = ImmutableMap.copyOf(map);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Registered sasl client factories: {}", clientFactories.keySet());
+ }
+ }
+
+ @Override
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName,
+ Map<String, ?> props, CallbackHandler cbh) throws SaslException {
+ for (final String mechanism : mechanisms) {
+ final List<SaslClientFactory> factories = clientFactories.get(mechanism);
+ if (factories != null) {
+ for (final SaslClientFactory factory : factories) {
+ final SaslClient saslClient = factory.createSaslClient(new String[]{mechanism}, authorizationId, protocol,
+ serverName, props, cbh);
+ if (saslClient != null) {
+ return saslClient;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String[] getMechanismNames(final Map<String, ?> props) {
+ return clientFactories.keySet().toArray(new String[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
new file mode 100644
index 0000000..0fe15af
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/FastSaslServerFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link Sasl#createSaslServer} is known to be slow. This class caches available server factories.
+ * This is a modified version of Apache Hadoop's implementation.
+ */
+public final class FastSaslServerFactory implements SaslServerFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FastSaslServerFactory.class);
+
+ // lazy initialization; all relevant providers should have registered with Security so that
+ // Sasl#getSaslServerFactories returns the latest possible list of SaslServer factories
+ private static final class Holder {
+ static final FastSaslServerFactory INSTANCE = new FastSaslServerFactory();
+
+ // prevent instantiation
+ private Holder() {
+ }
+ }
+
+ public static FastSaslServerFactory getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ // package private
+ @VisibleForTesting
+ static void reload() {
+ getInstance().refresh();
+ }
+
+ // non-final for testing purposes
+ private ImmutableMap<String, List<SaslServerFactory>> serverFactories;
+
+ // prevent instantiation
+ private FastSaslServerFactory() {
+ refresh();
+ }
+
+ // used in initialization, and for testing
+ private void refresh() {
+ final Enumeration<SaslServerFactory> factories = Sasl.getSaslServerFactories();
+ final Map<String, List<SaslServerFactory>> map = Maps.newHashMap();
+
+ while (factories.hasMoreElements()) {
+ final SaslServerFactory factory = factories.nextElement();
+ // Passing null so factory is populated with all possibilities. Properties passed when
+ // instantiating a server are what really matter. See createSaslServer.
+ for (final String mechanismName : factory.getMechanismNames(null)) {
+ if (!map.containsKey(mechanismName)) {
+ map.put(mechanismName, new ArrayList<SaslServerFactory>());
+ }
+ map.get(mechanismName).add(factory);
+ }
+ }
+
+ serverFactories = ImmutableMap.copyOf(map);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Registered sasl server factories: {}", serverFactories.keySet());
+ }
+ }
+
+ @Override
+ public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
+ CallbackHandler cbh) throws SaslException {
+ final List<SaslServerFactory> factories = serverFactories.get(mechanism);
+ if (factories != null) {
+ for (final SaslServerFactory factory : factories) {
+ final SaslServer saslServer = factory.createSaslServer(mechanism, protocol, serverName, props, cbh);
+ if (saslServer != null) {
+ return saslServer;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String[] getMechanismNames(final Map<String, ?> props) {
+ return serverFactories.keySet().toArray(new String[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
new file mode 100644
index 0000000..bf34d57
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.ServerConnection;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the server-side.
+ *
+ * @param <S> Server connection type
+ * @param <T> RPC type
+ */
+public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extends EnumLite>
+ implements RequestHandler<S> {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(ServerAuthenticationHandler.class);
+
+ private static final ImmutableMap<SaslStatus, SaslResponseProcessor> RESPONSE_PROCESSORS;
+
+ static {
+ final Map<SaslStatus, SaslResponseProcessor> map = new EnumMap<>(SaslStatus.class);
+ map.put(SaslStatus.SASL_START, new SaslStartProcessor());
+ map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+ map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+ map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+ RESPONSE_PROCESSORS = Maps.immutableEnumMap(map);
+ }
+
+ private final RequestHandler<S> requestHandler;
+ private final int saslRequestTypeValue;
+ private final T saslResponseType;
+
+ public ServerAuthenticationHandler(final RequestHandler<S> requestHandler, final int saslRequestTypeValue,
+ final T saslResponseType) {
+ this.requestHandler = requestHandler;
+ this.saslRequestTypeValue = saslRequestTypeValue;
+ this.saslResponseType = saslResponseType;
+ }
+
+ @Override
+ public void handle(S connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender)
+ throws RpcException {
+ final String remoteAddress = connection.getRemoteAddress().toString();
+
+ // exchange involves server "challenges" and client "responses" (initiated by client)
+ if (saslRequestTypeValue == rpcType) {
+ final SaslMessage saslResponse;
+ try {
+ saslResponse = SaslMessage.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ } catch (final InvalidProtocolBufferException e) {
+ handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+ return;
+ }
+
+ logger.trace("Received SASL message {} from {}", saslResponse.getStatus(), remoteAddress);
+ final SaslResponseProcessor processor = RESPONSE_PROCESSORS.get(saslResponse.getStatus());
+ if (processor == null) {
+ logger.info("Unknown message type from client from {}. Will stop authentication.", remoteAddress);
+ handleAuthFailure(remoteAddress, sender, new SaslException("Received unexpected message"),
+ saslResponseType);
+ return;
+ }
+
+ final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, remoteAddress,
+ sender, requestHandler, saslResponseType);
+ try {
+ processor.process(context);
+ } catch (final Exception e) {
+ handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+ }
+ } else {
+
+ // this handler only handles messages of SASL_MESSAGE_VALUE type
+
+ // the response type for this request type is likely known from UserRpcConfig,
+ // but the client should not be making any requests before authenticating.
+ // drop connection
+ throw new RpcException(
+ String.format("Request of type %d is not allowed without authentication. " +
+ "Client on %s must authenticate before making requests. Connection dropped.",
+ rpcType, remoteAddress));
+ }
+ }
+
+ private static class SaslResponseContext<S extends ServerConnection<S>, T extends EnumLite> {
+
+ final SaslMessage saslResponse;
+ final S connection;
+ final String remoteAddress;
+ final ResponseSender sender;
+ final RequestHandler<S> requestHandler;
+ final T saslResponseType;
+
+ SaslResponseContext(SaslMessage saslResponse, S connection, String remoteAddress, ResponseSender sender,
+ RequestHandler<S> requestHandler, T saslResponseType) {
+ this.saslResponse = checkNotNull(saslResponse);
+ this.connection = checkNotNull(connection);
+ this.remoteAddress = checkNotNull(remoteAddress);
+ this.sender = checkNotNull(sender);
+ this.requestHandler = checkNotNull(requestHandler);
+ this.saslResponseType = checkNotNull(saslResponseType);
+ }
+ }
+
+ private interface SaslResponseProcessor {
+
+ /**
+ * Process response from client, and if there are no exceptions, send response using
+ * {@link SaslResponseContext#sender}. Otherwise, throw the exception.
+ *
+ * @param context response context
+ */
+ <S extends ServerConnection<S>, T extends EnumLite>
+ void process(SaslResponseContext<S, T> context) throws Exception;
+
+ }
+
+ private static class SaslStartProcessor implements SaslResponseProcessor {
+
+ @Override
+ public <S extends ServerConnection<S>, T extends EnumLite>
+ void process(SaslResponseContext<S, T> context) throws Exception {
+ context.connection.initSaslServer(context.saslResponse.getMechanism());
+
+ // assume #evaluateResponse must be called at least once
+ RESPONSE_PROCESSORS.get(SaslStatus.SASL_IN_PROGRESS).process(context);
+ }
+ }
+
+ private static class SaslInProgressProcessor implements SaslResponseProcessor {
+
+ @Override
+ public <S extends ServerConnection<S>, T extends EnumLite>
+ void process(SaslResponseContext<S, T> context) throws Exception {
+ final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+ final SaslServer saslServer = context.connection.getSaslServer();
+
+ final byte[] challengeBytes = evaluateResponse(saslServer, context.saslResponse.getData().toByteArray());
+
+ if (saslServer.isComplete()) {
+ challenge.setStatus(SaslStatus.SASL_SUCCESS);
+ if (challengeBytes != null) {
+ challenge.setData(ByteString.copyFrom(challengeBytes));
+ }
+
+ handleSuccess(context, challenge, saslServer);
+ } else {
+ challenge.setStatus(SaslStatus.SASL_IN_PROGRESS)
+ .setData(ByteString.copyFrom(challengeBytes));
+ context.sender.send(new Response(context.saslResponseType, challenge.build()));
+ }
+ }
+ }
+
+ // only when client succeeds first
+ private static class SaslSuccessProcessor implements SaslResponseProcessor {
+
+ @Override
+ public <S extends ServerConnection<S>, T extends EnumLite>
+ void process(SaslResponseContext<S, T> context) throws Exception {
+ // at this point, #isComplete must be false; so try once, fail otherwise
+ final SaslServer saslServer = context.connection.getSaslServer();
+
+ evaluateResponse(saslServer, context.saslResponse.getData().toByteArray()); // discard challenge
+
+ if (saslServer.isComplete()) {
+ final SaslMessage.Builder challenge = SaslMessage.newBuilder();
+ challenge.setStatus(SaslStatus.SASL_SUCCESS);
+
+ handleSuccess(context, challenge, saslServer);
+ } else {
+ logger.info("Failed to authenticate client from {}", context.remoteAddress);
+ throw new SaslException("Client allegedly succeeded authentication, but server did not. Suspicious?");
+ }
+ }
+ }
+
+ private static class SaslFailedProcessor implements SaslResponseProcessor {
+
+ @Override
+ public <S extends ServerConnection<S>, T extends EnumLite>
+ void process(SaslResponseContext<S, T> context) throws Exception {
+ logger.info("Client from {} failed authentication graciously, and does not want to continue.",
+ context.remoteAddress);
+ throw new SaslException("Client graciously failed authentication");
+ }
+ }
+
+ private static byte[] evaluateResponse(final SaslServer saslServer,
+ final byte[] responseBytes) throws SaslException {
+ try {
+ return UserGroupInformation.getLoginUser()
+ .doAs(new PrivilegedExceptionAction<byte[]>() {
+ @Override
+ public byte[] run() throws Exception {
+ return saslServer.evaluateResponse(responseBytes);
+ }
+ });
+ } catch (final UndeclaredThrowableException e) {
+ throw new SaslException(String.format("Unexpected failure trying to authenticate using %s",
+ saslServer.getMechanismName()), e.getCause());
+ } catch (final IOException | InterruptedException e) {
+ if (e instanceof SaslException) {
+ throw (SaslException) e;
+ } else {
+ throw new SaslException(String.format("Unexpected failure trying to authenticate using %s",
+ saslServer.getMechanismName()), e);
+ }
+ }
+ }
+
+ private static <S extends ServerConnection<S>, T extends EnumLite>
+ void handleSuccess(final SaslResponseContext<S, T> context, final SaslMessage.Builder challenge,
+ final SaslServer saslServer) throws IOException {
+ context.connection.changeHandlerTo(context.requestHandler);
+ context.connection.finalizeSaslSession();
+ context.sender.send(new Response(context.saslResponseType, challenge.build()));
+
+ // setup security layers here..
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Authenticated {} successfully using {} from {}", saslServer.getAuthorizationID(),
+ saslServer.getMechanismName(), context.remoteAddress);
+ }
+ }
+
+ private static final SaslMessage SASL_FAILED_MESSAGE =
+ SaslMessage.newBuilder().setStatus(SaslStatus.SASL_FAILED).build();
+
+ private static <T extends EnumLite>
+ void handleAuthFailure(final String remoteAddress, final ResponseSender sender,
+ final Exception e, final T saslResponseType) throws RpcException {
+ logger.debug("Authentication failed from client {} due to {}", remoteAddress, e);
+
+ // inform the client that authentication failed, and no more
+ sender.send(new Response(saslResponseType, SASL_FAILED_MESSAGE));
+
+ // drop connection
+ throw new RpcException(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
new file mode 100644
index 0000000..855dd8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.kerberos;
+
+import org.apache.drill.common.KerberosUtil;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.FastSaslClientFactory;
+import org.apache.drill.exec.rpc.security.FastSaslServerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+public class KerberosFactory implements AuthenticatorFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KerberosFactory.class);
+
+ private static final String DRILL_SERVICE_NAME = System.getProperty("drill.principal.primary", "drill");
+
+ @Override
+ public String getSimpleName() {
+ return KerberosUtil.KERBEROS_SIMPLE_NAME;
+ }
+
+ @Override
+ public UserGroupInformation createAndLoginUser(final Map<String, ?> properties) throws IOException {
+ final Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+ UserGroupInformation.AuthenticationMethod.KERBEROS.toString());
+ UserGroupInformation.setConfiguration(conf);
+
+ final String keytab = (String) properties.get(DrillProperties.KEYTAB);
+ final boolean assumeSubject = properties.containsKey(DrillProperties.KERBEROS_FROM_SUBJECT) &&
+ Boolean.parseBoolean((String) properties.get(DrillProperties.KERBEROS_FROM_SUBJECT));
+ try {
+ final UserGroupInformation ugi;
+ if (assumeSubject) {
+ ugi = UserGroupInformation.getUGIFromSubject(Subject.getSubject(AccessController.getContext()));
+ logger.debug("Assuming subject for {}.", ugi.getShortUserName());
+ } else {
+ if (keytab != null) {
+ ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+ (String) properties.get(DrillProperties.USER), keytab);
+ logger.debug("Logged in {} using keytab.", ugi.getShortUserName());
+ } else {
+ // includes Kerberos ticket login
+ ugi = UserGroupInformation.getCurrentUser();
+ logger.debug("Logged in {} using ticket.", ugi.getShortUserName());
+ }
+ }
+ return ugi;
+ } catch (final IOException e) {
+ logger.debug("Login failed.", e);
+ final Throwable cause = e.getCause();
+ if (cause instanceof LoginException) {
+ throw new SaslException("Failed to login.", cause);
+ }
+ throw new SaslException("Unexpected failure trying to login.", cause);
+ }
+ }
+
+ @Override
+ public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
+ throws SaslException {
+ try {
+ final String primaryName = ugi.getShortUserName();
+ final String instanceName = new HadoopKerberosName(ugi.getUserName()).getHostName();
+
+ final SaslServer saslServer = ugi.doAs(new PrivilegedExceptionAction<SaslServer>() {
+ @Override
+ public SaslServer run() throws Exception {
+ return FastSaslServerFactory.getInstance()
+ .createSaslServer(KerberosUtil.KERBEROS_SASL_NAME, primaryName, instanceName, properties,
+ new KerberosServerCallbackHandler());
+ }
+ });
+ logger.trace("GSSAPI SaslServer created.");
+ return saslServer;
+ } catch (final UndeclaredThrowableException e) {
+ final Throwable cause = e.getCause();
+ logger.debug("Authentication failed.", cause);
+ if (cause instanceof SaslException) {
+ throw (SaslException) cause;
+ } else {
+ throw new SaslException("Unexpected failure trying to authenticate using Kerberos", cause);
+ }
+ } catch (final IOException | InterruptedException e) {
+ logger.debug("Authentication failed.", e);
+ throw new SaslException("Unexpected failure trying to authenticate using Kerberos", e);
+ }
+ }
+
+ @Override
+ public SaslClient createSaslClient(final UserGroupInformation ugi, final Map<String, ?> properties)
+ throws SaslException {
+ final String servicePrincipal = getServicePrincipal(properties);
+
+ final String parts[] = KerberosUtil.splitPrincipalIntoParts(servicePrincipal);
+ final String serviceName = parts[0];
+ final String serviceHostName = parts[1];
+ // ignore parts[2]; GSSAPI gets the realm info from the ticket
+ try {
+ final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
+
+ @Override
+ public SaslClient run() throws Exception {
+ return FastSaslClientFactory.getInstance().createSaslClient(new String[]{KerberosUtil.KERBEROS_SASL_NAME},
+ null /** authorization ID */, serviceName, serviceHostName, properties,
+ new CallbackHandler() {
+ @Override
+ public void handle(final Callback[] callbacks)
+ throws IOException, UnsupportedCallbackException {
+ throw new UnsupportedCallbackException(callbacks[0]);
+ }
+ });
+ }
+ });
+ logger.debug("GSSAPI SaslClient created to authenticate to {} running on {}",
+ serviceName, serviceHostName);
+ return saslClient;
+ } catch (final UndeclaredThrowableException e) {
+ logger.debug("Authentication failed.", e);
+ throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
+ serviceHostName), e.getCause());
+ } catch (final IOException | InterruptedException e) {
+ logger.debug("Authentication failed.", e);
+ if (e instanceof SaslException) {
+ throw (SaslException) e;
+ }
+ throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
+ serviceHostName), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ private static class KerberosServerCallbackHandler implements CallbackHandler {
+
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ final AuthorizeCallback authorizeCallback = (AuthorizeCallback) callback;
+ if (!authorizeCallback.getAuthenticationID()
+ .equals(authorizeCallback.getAuthorizationID())) {
+ throw new SaslException("Drill expects authorization ID and authentication ID to match. " +
+ "Use inbound impersonation feature so one entity can act on behalf of another.");
+ } else {
+ authorizeCallback.setAuthorized(true);
+ }
+ } else {
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+ }
+
+ private static String getServicePrincipal(final Map<String, ?> properties) throws SaslException {
+ final String principal = (String) properties.get(DrillProperties.SERVICE_PRINCIPAL);
+ if (principal != null) {
+ return principal;
+ }
+
+ final String serviceHostname = (String) properties.get(DrillProperties.SERVICE_HOST);
+ if (serviceHostname == null) {
+ throw new SaslException("Unknown Drillbit hostname. Check connection parameters?");
+ }
+
+ final String serviceName = (String) properties.get(DrillProperties.SERVICE_NAME);
+ final String realm = (String) properties.get(DrillProperties.REALM);
+ try {
+ return KerberosUtil.getPrincipalFromParts(
+ serviceName == null ? DRILL_SERVICE_NAME : serviceName,
+ serviceHostname.toLowerCase(), // see HADOOP-7988
+ realm == null ? KerberosUtil.getDefaultRealm() : realm
+ );
+ } catch (final ClassNotFoundException | NoSuchMethodException |
+ IllegalAccessException | InvocationTargetException e) {
+ throw new SaslException("Could not resolve realm information. Please set explicitly in connection parameters.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
new file mode 100644
index 0000000..5c6eff3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/package-info.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Communication security.
+ * <p>
+ * Drill uses Java's SASL library to authenticate clients (users and other bits). This is achieved using
+ * {@link org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener} on the client-side, and
+ * {@link org.apache.drill.exec.rpc.security.ServerAuthenticationHandler} on the server-side.
+ * <p>
+ * If authentication is enabled, {@link org.apache.drill.exec.rpc.security.AuthenticatorFactory authenticator factory}
+ * implementations are discovered at startup from {@link org.apache.drill.common.scanner.persistence.ScanResult
+ * scan result} using {@link org.apache.drill.exec.rpc.security.AuthenticatorProviderImpl}. At connection time, after
+ * handshake, if either side requires authentication, a series of SASL messages are exchanged. Without successful
+ * authentication, any subsequent messages will result in failure and connection drop.
+ * <p>
+ * Out of the box, Drill supports {@link org.apache.drill.exec.rpc.security.kerberos.KerberosFactory KERBEROS}
+ * (through GSSAPI) and {@link org.apache.drill.exec.rpc.security.plain.PlainFactory PLAIN} (through
+ * {@link org.apache.drill.exec.rpc.user.security.UserAuthenticator}) mechanisms.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/DRILL-4280">
+ * DRILL-4280 (design and configuration)</a>
+ * @see <a href="https://docs.oracle.com/javase/7/docs/api/javax/security/sasl/package-summary.html">
+ * Java's SASL Library</a>
+ */
+package org.apache.drill.exec.rpc.security;
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
new file mode 100644
index 0000000..4a0db95
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.plain;
+
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.FastSaslClientFactory;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+public class PlainFactory implements AuthenticatorFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlainFactory.class);
+
+ public static final String SIMPLE_NAME = PlainServer.MECHANISM_NAME;
+
+ private final UserAuthenticator authenticator;
+
+ public PlainFactory() {
+ this.authenticator = null;
+ }
+
+ public PlainFactory(final UserAuthenticator authenticator) {
+ this.authenticator = authenticator;
+ }
+
+ @Override
+ public String getSimpleName() {
+ return SIMPLE_NAME;
+ }
+
+ @Override
+ public UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException {
+ final Configuration conf = new Configuration();
+ UserGroupInformation.setConfiguration(conf);
+ try {
+ return UserGroupInformation.getCurrentUser();
+ } catch (final IOException e) {
+ logger.debug("Login failed.", e);
+ final Throwable cause = e.getCause();
+ if (cause instanceof LoginException) {
+ throw new SaslException("Failed to login.", cause);
+ }
+ throw new SaslException("Unexpected failure trying to login. ", cause);
+ }
+ }
+
+ @Override
+ public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
+ throws SaslException {
+ return new PlainServer(authenticator, properties);
+ }
+
+ @Override
+ public SaslClient createSaslClient(final UserGroupInformation ugi, final Map<String, ?> properties)
+ throws SaslException {
+ final String userName = (String) properties.get(DrillProperties.USER);
+ final String password = (String) properties.get(DrillProperties.PASSWORD);
+
+ return FastSaslClientFactory.getInstance().createSaslClient(new String[]{SIMPLE_NAME},
+ null /** authorization ID */, null, null, properties, new CallbackHandler() {
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback.class.cast(callback).setName(userName);
+ continue;
+ }
+ if (callback instanceof PasswordCallback) {
+ PasswordCallback.class.cast(callback).setPassword(password.toCharArray());
+ continue;
+ }
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (authenticator != null) {
+ authenticator.close();
+ }
+ }
+
+ // used for clients < 1.10
+ public UserAuthenticator getAuthenticator() {
+ return authenticator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
new file mode 100644
index 0000000..417fca1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/plain/PlainServer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security.plain;
+
+import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
+import org.apache.drill.exec.rpc.user.security.UserAuthenticator;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Plain SaslServer implementation.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc4616">RFC for PLAIN SASL mechanism</a>
+ */
+class PlainServer implements SaslServer {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlainServer.class);
+
+ private static final String UTF_8_NULL = "\u0000";
+
+ public static final String MECHANISM_NAME = "PLAIN";
+
+ private final UserAuthenticator authenticator;
+
+ private boolean completed = false;
+ private String authorizationID;
+
+ PlainServer(final UserAuthenticator authenticator, final Map<String, ?> properties) throws SaslException {
+ if (properties != null) {
+ if ("true".equalsIgnoreCase((String) properties.get(Sasl.POLICY_NOPLAINTEXT))) {
+ throw new SaslException("PLAIN authentication is not permitted.");
+ }
+ }
+ this.authenticator = authenticator;
+ }
+
+ @Override
+ public String getMechanismName() {
+ return MECHANISM_NAME;
+ }
+
+ @Override
+ public byte[] evaluateResponse(byte[] response) throws SaslException {
+ if (completed) {
+ throw new IllegalStateException("PLAIN authentication already completed");
+ }
+
+ if (response == null) {
+ throw new SaslException("Received null response");
+ }
+
+ final String payload = new String(response, StandardCharsets.UTF_8);
+
+ // Separator defined in PlainClient is 0
+ // three parts: [ authorizationID, authenticationID, password ]
+ final String[] parts = payload.split(UTF_8_NULL, 3);
+ if (parts.length != 3) {
+ throw new SaslException("Received corrupt response. Expected 3 parts, but received "
+ + parts.length);
+ }
+ String authorizationID = parts[0];
+ final String authenticationID = parts[1];
+ final String password = parts[2];
+
+ if (authorizationID.isEmpty()) {
+ authorizationID = authenticationID;
+ }
+
+ try {
+ authenticator.authenticate(authenticationID, password);
+ } catch (final UserAuthenticationException e) {
+ throw new SaslException(e.getMessage());
+ }
+
+ if (!authorizationID.equals(authenticationID)) {
+ throw new SaslException("Drill expects authorization ID and authentication ID to match. " +
+ "Use inbound impersonation feature so one entity can act on behalf of another.");
+ }
+
+ this.authorizationID = authorizationID;
+ completed = true;
+ return null;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return completed;
+ }
+
+ @Override
+ public String getAuthorizationID() {
+ if (completed) {
+ return authorizationID;
+ }
+ throw new IllegalStateException("PLAIN authentication not completed");
+ }
+
+ @Override
+ public Object getNegotiatedProperty(String propName) {
+ if (completed) {
+ return Sasl.QOP.equals(propName) ? "auth" : null;
+ }
+ throw new IllegalStateException("PLAIN authentication not completed");
+ }
+
+ @Override
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ if (completed) {
+ throw new SaslException("PLAIN supports neither integrity nor privacy");
+ } else {
+ throw new IllegalStateException("PLAIN authentication not completed");
+ }
+ }
+
+ @Override
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ if (completed) {
+ throw new SaslException("PLAIN supports neither integrity nor privacy");
+ } else {
+ throw new IllegalStateException("PLAIN authentication not completed");
+ }
+ }
+
+ @Override
+ public void dispose() throws SaslException {
+ authorizationID = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8a732c08/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 0b45f52..ecf6f6a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -26,6 +26,7 @@ drill {
org.apache.drill.exec.physical.impl.BatchCreator,
org.apache.drill.exec.physical.impl.RootCreator,
org.apache.drill.exec.rpc.user.security.UserAuthenticator,
+ org.apache.drill.exec.rpc.security.AuthenticatorFactory,
org.apache.drill.exec.store.dfs.FormatPlugin,
org.apache.drill.exec.store.StoragePlugin
],
@@ -36,7 +37,8 @@ drill {
org.apache.drill.exec.expr,
org.apache.drill.exec.physical,
org.apache.drill.exec.store,
- org.apache.drill.exec.rpc.user.security
+ org.apache.drill.exec.rpc.user.security,
+ org.apache.drill.exec.rpc.security
]
}
}
@@ -144,10 +146,11 @@ drill.exec: {
max_chained_user_hops: 3
},
security.user.auth {
- enabled: false,
- impl: "pam",
- pam_profiles: [ "sudo", "login" ]
+ enabled: false
},
+ security.bit.auth {
+ enabled : false
+ }
trace: {
directory: "/tmp/drill-trace",
filesystem: "file:///"