You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/03/12 11:54:36 UTC
[3/5] drill git commit: DRILL-6187: Exception in RPC communication
between DataClient/ControlClient and respective servers when bit-to-bit
security is on
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index aa26fd6..640eb40 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -111,7 +111,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
- updateClient(connectionProps);
+
+ newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+ ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+ .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+ .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true)));
+
+ updateTestCluster(1, newConfig, connectionProps);
// Run few queries using the new client
testBuilder()
@@ -145,7 +160,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
- updateClient(connectionProps);
+
+ newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+ ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+ .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+ .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true)));
+
+ updateTestCluster(1, newConfig, connectionProps);
assertTrue(UserRpcMetrics.getInstance().getEncryptedConnectionCount() == 1);
assertTrue(UserRpcMetrics.getInstance().getUnEncryptedConnectionCount() == 0);
@@ -177,10 +207,24 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
krbHelper.clientKeytab.getAbsoluteFile());
+ newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+ ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+ .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+ .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true)));
+
Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- updateClient(connectionProps);
+ updateTestCluster(1, newConfig, connectionProps);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 34f75f4..f86d698 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.math3.util.Pair;
import org.apache.drill.exec.work.foreman.FragmentsRunner;
@@ -205,7 +206,7 @@ public class TestDrillbitResilience extends DrillTest {
// create a client
final DrillConfig drillConfig = zkHelper.getConfig();
- drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null);
+ drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties());
clearAllInjections();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
index 94c8ebf..1098dc4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -87,7 +87,7 @@ public class TestResourceLeak extends DrillTest {
bit = new Drillbit(config, serviceSet);
bit.run();
- client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+ client = QueryTestUtil.createClient(config, serviceSet, 2, new Properties());
}
@Test
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/rpc/pom.xml b/exec/rpc/pom.xml
index ea56574..5ed62ee 100644
--- a/exec/rpc/pom.xml
+++ b/exec/rpc/pom.xml
@@ -77,6 +77,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0f4ef1b..4395db3 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.rpc;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -32,16 +36,17 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-
-import java.util.concurrent.TimeUnit;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Internal.EnumLite;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -69,6 +74,9 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
private final IdlePingHandler pingHandler;
private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null;
+ // Determines if authentication is completed between client and server
+ private boolean authComplete = true;
+
public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
Class<HR> responseClass, Parser<HR> handshakeParser) {
super(rpcMapping);
@@ -133,6 +141,19 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
return false;
}
+ /**
+ * Set's the state for authentication complete.
+ * @param authComplete - state to set. True means authentication between client and server is completed, false
+ * means authentication is in progress.
+ */
+ protected void setAuthComplete(boolean authComplete) {
+ this.authComplete = authComplete;
+ }
+
+ protected boolean isAuthComplete() {
+ return authComplete;
+ }
+
// Save the SslChannel after the SSL handshake so it can be closed later
public void setSslChannel(Channel c) {
@@ -180,7 +201,67 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
return (connection != null) && connection.isActive();
}
- protected abstract void validateHandshake(HR validateHandshake) throws RpcException;
+ protected abstract List<String> validateHandshake(HR validateHandshake) throws RpcException;
+
+ /**
+ * Creates various instances needed to start the SASL handshake. This is called from
+ * {@link BasicClient#validateHandshake(MessageLite)} if authentication is required from server side.
+ * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+ * @param serverAuthMechanisms - List of auth mechanisms configured on server side
+ */
+ protected abstract void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
+ List<String> serverAuthMechanisms) throws RpcException;
+
+ /**
+ * Main method which starts the SASL handshake for all client channels (user/data/control) once it's determined
+ * after regular RPC handshake that authentication is required by server side. Once authentication is completed
+ * then only the underlying channel is made available to clients to send other RPC messages. Success and failure
+ * events are notified to the connection handler on which client waits.
+ * @param connectionHandler - Connection handler used by client's to know about success/failure conditions.
+ * @param saslProperties - SASL related properties needed to create SASL client.
+ * @param ugi - UserGroupInformation with logged in client side user
+ * @param authFactory - Authentication factory to use for this SASL handshake.
+ * @param rpcType - SASL_MESSAGE rpc type.
+ */
+ protected void startSaslHandshake(final RpcConnectionHandler<CC> connectionHandler,
+ Map<String, ?> saslProperties, UserGroupInformation ugi,
+ AuthenticatorFactory authFactory, T rpcType) {
+ final String mechanismName = authFactory.getSimpleName();
+ try {
+ final SaslClient saslClient = authFactory.createSaslClient(ugi, saslProperties);
+ if (saslClient == null) {
+ final Exception ex = new SaslException(String.format("Cannot initiate authentication using %s mechanism. " +
+ "Insufficient credentials or selected mechanism doesn't support configured security layers?", mechanismName));
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+ return;
+ }
+ connection.setSaslClient(saslClient);
+ } catch (final SaslException e) {
+ logger.error("Failed while creating SASL client for SASL handshake for connection", connection.getName());
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, e);
+ return;
+ }
+
+ logger.debug("Initiating SASL exchange.");
+ new AuthenticationOutcomeListener<>(this, connection, rpcType, ugi,
+ new RpcOutcomeListener<Void>() {
+ @Override
+ public void failed(RpcException ex) {
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+ }
+
+ @Override
+ public void success(Void value, ByteBuf buffer) {
+ authComplete = true;
+ connectionHandler.connectionSucceeded(connection);
+ }
+
+ @Override
+ public void interrupted(InterruptedException ex) {
+ connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
+ }
+ }).initiate(mechanismName);
+ }
protected void finalizeConnection(HR handshake, CC connection) {
// no-op
@@ -204,12 +285,6 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
allowInEventLoop, dataBodies);
}
- // the command itself must be "run" by the caller (to avoid calling inEventLoop)
- protected <M extends MessageLite> RpcCommand<M, CC>
- getInitialCommand(final RpcCommand<M, CC> command) {
- return command;
- }
-
protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
String host, int port) {
ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml;
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
index 0cdca13..3fee5d7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.DrillException;
import org.slf4j.Logger;
import java.net.SocketAddress;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -151,12 +152,21 @@ public class ConnectionMultiListener<T extends EnumLite, CC extends ClientConnec
public void success(HR value, ByteBuf buffer) {
// logger.debug("Handshake received. {}", value);
try {
- parent.validateHandshake(value);
+ final List<String> serverAuthMechanisms = parent.validateHandshake(value);
parent.finalizeConnection(value, parent.connection);
- connectionListener.connectionSucceeded(parent.connection);
- // logger.debug("Handshake completed succesfully.");
+
+ // If auth is required then start the SASL handshake
+ if (serverAuthMechanisms != null) {
+ parent.prepareSaslHandshake(connectionListener, serverAuthMechanisms);
+ } else {
+ connectionListener.connectionSucceeded(parent.connection);
+ logger.debug("Handshake completed successfully.");
+ }
+ } catch (NonTransientRpcException ex) {
+ logger.error("Failure while validating client and server sasl compatibility", ex);
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex);
} catch (Exception ex) {
- logger.debug("Failure while validating handshake", ex);
+ logger.error("Failure while validating handshake", ex);
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index a64a23b..3936170 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -78,7 +78,7 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte
} else {
// logger.debug("No connection active, opening client connection.");
BasicClient<?, C, HS, ?> client = getNewClient();
- ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(client.getInitialCommand(cmd));
+ ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(cmd);
client.connectAsClient(future, handshake, host, port);
future.waitAndRun();
// logger.debug("Connection available and active, command now being run inline.");
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
new file mode 100644
index 0000000..5c34d01
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
+import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumMap;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Handles SASL exchange, on the client-side.
+ *
+ * @param <T> handshake rpc type
+ * @param <C> Client connection type
+ * @param <HS> Handshake send type
+ * @param <HR> Handshake receive type
+ */
+public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientConnection,
+ HS extends MessageLite, HR extends MessageLite>
+ implements RpcOutcomeListener<SaslMessage> {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
+
+ private static final ImmutableMap<SaslStatus, SaslChallengeProcessor>
+ CHALLENGE_PROCESSORS;
+ static {
+ final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
+ map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
+ map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
+ map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
+ CHALLENGE_PROCESSORS = Maps.immutableEnumMap(map);
+ }
+
+ private final BasicClient<T, C, HS, HR> client;
+ private final C connection;
+ private final T saslRpcType;
+ private final UserGroupInformation ugi;
+ private final RpcOutcomeListener<?> completionListener;
+
+ public AuthenticationOutcomeListener(BasicClient<T, C, HS, HR> client,
+ C connection, T saslRpcType, UserGroupInformation ugi,
+ RpcOutcomeListener<?> completionListener) {
+ this.client = client;
+ this.connection = connection;
+ this.saslRpcType = saslRpcType;
+ this.ugi = ugi;
+ this.completionListener = completionListener;
+ }
+
+ public void initiate(final String mechanismName) {
+ logger.trace("Initiating SASL exchange.");
+ try {
+ final ByteString responseData;
+ final SaslClient saslClient = connection.getSaslClient();
+ if (saslClient.hasInitialResponse()) {
+ responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0]));
+ } else {
+ responseData = ByteString.EMPTY;
+ }
+ client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+ connection,
+ saslRpcType,
+ SaslMessage.newBuilder()
+ .setMechanism(mechanismName)
+ .setStatus(SaslStatus.SASL_START)
+ .setData(responseData)
+ .build(),
+ SaslMessage.class,
+ true /* the connection will not be backed up at this point */);
+ logger.trace("Initiated SASL exchange.");
+ } catch (final Exception e) {
+ completionListener.failed(RpcException.mapException(e));
+ }
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ completionListener.failed(RpcException.mapException(ex));
+ }
+
+ @Override
+ public void success(SaslMessage value, ByteBuf buffer) {
+ logger.trace("Server responded with message of type: {}", value.getStatus());
+ final SaslChallengeProcessor processor = CHALLENGE_PROCESSORS.get(value.getStatus());
+ if (processor == null) {
+ completionListener.failed(RpcException.mapException(
+ new SaslException("Server sent a corrupt message.")));
+ } else {
+ // SaslSuccessProcessor.process disposes saslClient so get mechanism here to use later in logging
+ final String mechanism = connection.getSaslClient().getMechanismName();
+ try {
+ final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection);
+ final SaslMessage saslResponse = processor.process(context);
+
+ if (saslResponse != null) {
+ client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
+ connection, saslRpcType, saslResponse, SaslMessage.class,
+ true /* the connection will not be backed up at this point */);
+ } else {
+ // success
+ completionListener.success(null, null);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}",
+ mechanism, connection.getEncryptionCtxtString());
+ }
+ }
+ } catch (final Exception e) {
+ logger.error("Authentication with encryption context: {} using mechanism {} failed with {}",
+ connection.getEncryptionCtxtString(), mechanism, e.getMessage());
+ completionListener.failed(RpcException.mapException(e));
+ }
+ }
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ completionListener.interrupted(e);
+ }
+
+ private static class SaslChallengeContext<C extends ClientConnection> {
+
+ final SaslMessage challenge;
+ final UserGroupInformation ugi;
+ final C connection;
+
+ SaslChallengeContext(SaslMessage challenge, UserGroupInformation ugi, C connection) {
+ this.challenge = checkNotNull(challenge);
+ this.ugi = checkNotNull(ugi);
+ this.connection = checkNotNull(connection);
+ }
+ }
+
+ private interface SaslChallengeProcessor {
+
+ /**
+ * Process challenge from server, and return a response.
+ *
+ * Returns null iff SASL exchange is complete and successful.
+ *
+ * @param context challenge context
+ * @return response
+ * @throws Exception in case of any failure
+ */
+ <CC extends ClientConnection>
+ SaslMessage process(SaslChallengeContext<CC> context) throws Exception;
+
+ }
+
+ private static class SaslInProgressProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+ final SaslMessage.Builder response = SaslMessage.newBuilder();
+ final SaslClient saslClient = context.connection.getSaslClient();
+
+ final byte[] responseBytes = evaluateChallenge(context.ugi, saslClient,
+ context.challenge.getData().toByteArray());
+
+ final boolean isComplete = saslClient.isComplete();
+ logger.trace("Evaluated challenge. Completed? {}.", isComplete);
+ response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
+ // if isComplete, the client will get one more response from server
+ response.setStatus(isComplete ? SaslStatus.SASL_SUCCESS : SaslStatus.SASL_IN_PROGRESS);
+ return response.build();
+ }
+ }
+
+ private static class SaslSuccessProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+ final SaslClient saslClient = context.connection.getSaslClient();
+
+ if (saslClient.isComplete()) {
+ handleSuccess(context);
+ return null;
+ } else {
+ // server completed before client; so try once, fail otherwise
+ evaluateChallenge(context.ugi, saslClient, context.challenge.getData().toByteArray()); // discard response
+
+ if (saslClient.isComplete()) {
+ handleSuccess(context);
+ return null;
+ } else {
+ throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
+ }
+ }
+ }
+ }
+
+ private static class SaslFailedProcessor implements SaslChallengeProcessor {
+
+ @Override
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+ throw new SaslException(String.format("Authentication failed. Incorrect credentials? [Details: %s]",
+ context.connection.getEncryptionCtxtString()));
+ }
+ }
+
+ private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient,
+ final byte[] challengeBytes) throws SaslException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+ @Override
+ public byte[] run() throws Exception {
+ return saslClient.evaluateChallenge(challengeBytes);
+ }
+ });
+ } catch (final UndeclaredThrowableException e) {
+ throw new SaslException(
+ String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e.getCause());
+ } catch (final IOException | InterruptedException e) {
+ if (e instanceof SaslException) {
+ throw (SaslException) e;
+ } else {
+ throw new SaslException(
+ String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e);
+ }
+ }
+ }
+
+
+ private static <CC extends ClientConnection> void handleSuccess(SaslChallengeContext<CC> context) throws
+ SaslException {
+ final CC connection = context.connection;
+ final SaslClient saslClient = connection.getSaslClient();
+
+ try {
+ // Check if connection was marked for being secure then verify for negotiated QOP value for
+ // correctness.
+ final String negotiatedQOP = saslClient.getNegotiatedProperty(Sasl.QOP).toString();
+ final String expectedQOP = connection.isEncryptionEnabled()
+ ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+ : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+ if (!(negotiatedQOP.equals(expectedQOP))) {
+ throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+ negotiatedQOP, expectedQOP));
+ }
+
+ // Update the rawWrapChunkSize with the negotiated buffer size since we cannot call encode with more than
+ // negotiated size of buffer.
+ if (connection.isEncryptionEnabled()) {
+ final int negotiatedRawSendSize = Integer.parseInt(
+ saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+ if (negotiatedRawSendSize <= 0) {
+ throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+ "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+ negotiatedRawSendSize));
+ }
+ connection.setWrapSizeLimit(negotiatedRawSendSize);
+ }
+ } catch (Exception e) {
+ throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+ e.getMessage()), e);
+ }
+
+ if (connection.isEncryptionEnabled()) {
+ connection.addSecurityHandlers();
+ } else {
+ // Encryption is not required hence we don't need to hold on to saslClient object.
+ connection.disposeSaslClient();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
new file mode 100644
index 0000000..307ae97
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * An implementation of this factory will be initialized once at startup, if the authenticator is enabled
+ * (see {@link #getSimpleName}). For every request for this mechanism (i.e. after establishing a connection),
+ * {@link #createSaslServer} will be invoked on the server-side and {@link #createSaslClient} will be invoked
+ * on the client-side.
+ *
+ * Note:
+ * + Custom authenticators must have a default constructor.
+ *
+ * Examples: PlainFactory and KerberosFactory.
+ */
+public interface AuthenticatorFactory extends AutoCloseable {
+
+ /**
+ * Name of the mechanism, in upper case.
+ *
+ * If this mechanism is present in the list of enabled mechanisms, an instance of this factory is loaded. Note
+ * that the simple name maybe the same as it's SASL name.
+ *
+ * @return mechanism name
+ */
+ String getSimpleName();
+
+ /**
+ * Create and get the login user based on the given properties.
+ *
+ * @param properties config properties
+ * @return ugi
+ * @throws IOException
+ */
+ UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException;
+
+ /**
+ * The caller is responsible for {@link SaslServer#dispose disposing} the returned SaslServer.
+ *
+ * @param ugi ugi
+ * @param properties config properties
+ * @return sasl server
+ * @throws SaslException
+ */
+ SaslServer createSaslServer(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+ /**
+ * The caller is responsible for {@link SaslClient#dispose disposing} the returned SaslClient.
+ *
+ * @param ugi ugi
+ * @param properties config properties
+ * @return sasl client
+ * @throws SaslException
+ */
+ SaslClient createSaslClient(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException;
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
new file mode 100644
index 0000000..9ed85ce
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.Sasl;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class SaslProperties {
+
+ /**
+ * All supported Quality of Protection values which can be negotiated
+ */
+ enum QualityOfProtection {
+ AUTHENTICATION("auth"),
+ INTEGRITY("auth-int"),
+ PRIVACY("auth-conf");
+
+ public final String saslQop;
+
+ QualityOfProtection(String saslQop) {
+ this.saslQop = saslQop;
+ }
+
+ public String getSaslQop() {
+ return saslQop;
+ }
+ }
+
+ /**
+ * Get's the map of minimum set of SaslProperties required during negotiation process either for encryption
+ * or authentication
+ * @param encryptionEnabled - Flag to determine if property needed is for encryption or authentication
+ * @param wrappedChunkSize - Configured wrappedChunkSize to negotiate for.
+ * @return Map of SaslProperties which will be used in negotiation.
+ */
+ public static Map<String, String> getSaslProperties(boolean encryptionEnabled, int wrappedChunkSize) {
+ Map<String, String> saslProps = new HashMap<>();
+
+ if (encryptionEnabled) {
+ saslProps.put(Sasl.STRENGTH, "high");
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.MAX_BUFFER, Integer.toString(wrappedChunkSize));
+ saslProps.put(Sasl.POLICY_NOPLAINTEXT, "true");
+ } else {
+ saslProps.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.getSaslQop());
+ }
+
+ return saslProps;
+ }
+
+ private SaslProperties() {
+
+ }
+}
\ No newline at end of file