You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:45 UTC
[4/6] drill git commit: DRILL-4335: Apache Drill should support
network encryption.
DRILL-4335: Apache Drill should support network encryption.
NOTE: This pull request provides support for on-wire encryption using SASL framework. The communication channel that are covered are:
1) Between Drill JDBC client and Drillbit.
2) Between Drillbit to Drillbit i.e. control/data channels.
3) It has UI change to view encryption is enabled on which network channel and number of encrypted/unencrypted connections for
user/control/data connections.
close apache/drill#773
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ce8bbc01
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ce8bbc01
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ce8bbc01
Branch: refs/heads/master
Commit: ce8bbc01cfde7d714185919be2ca2923d19ea890
Parents: 416ec70
Author: Sorabh Hamirwasia <sh...@maprtech.com>
Authored: Wed Feb 1 18:44:21 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sat May 20 16:16:21 2017 -0700
----------------------------------------------------------------------
.../drill/common/config/DrillProperties.java | 16 +
.../org/apache/drill/exec/ExecConstants.java | 5 +
.../exec/rpc/AbstractClientConnection.java | 42 +-
.../exec/rpc/AbstractConnectionConfig.java | 11 +
.../drill/exec/rpc/AbstractRpcMetrics.java | 52 ++
.../exec/rpc/AbstractServerConnection.java | 55 +-
.../drill/exec/rpc/BitConnectionConfig.java | 32 +-
.../apache/drill/exec/rpc/ConnectionConfig.java | 3 +
.../drill/exec/rpc/control/ControlClient.java | 15 +-
.../exec/rpc/control/ControlConnection.java | 45 +-
.../exec/rpc/control/ControlRpcMetrics.java | 84 +++
.../drill/exec/rpc/control/ControlServer.java | 8 +-
.../drill/exec/rpc/control/ControllerImpl.java | 3 +
.../apache/drill/exec/rpc/data/DataClient.java | 21 +-
.../exec/rpc/data/DataClientConnection.java | 16 +-
.../exec/rpc/data/DataConnectionCreator.java | 5 +-
.../drill/exec/rpc/data/DataRpcMetrics.java | 85 +++
.../apache/drill/exec/rpc/data/DataServer.java | 8 +-
.../exec/rpc/data/DataServerConnection.java | 9 +
.../security/AuthenticationOutcomeListener.java | 104 +++-
.../rpc/security/AuthenticatorProviderImpl.java | 1 -
.../security/ClientAuthenticatorProvider.java | 2 +-
.../drill/exec/rpc/security/SaslProperties.java | 70 +++
.../security/ServerAuthenticationHandler.java | 96 +++-
.../rpc/security/kerberos/KerberosFactory.java | 24 +-
.../apache/drill/exec/rpc/user/UserClient.java | 80 ++-
.../exec/rpc/user/UserConnectionConfig.java | 39 +-
.../drill/exec/rpc/user/UserRpcMetrics.java | 84 +++
.../apache/drill/exec/rpc/user/UserServer.java | 38 +-
.../user/security/UserAuthenticatorFactory.java | 6 +
.../drill/exec/server/rest/DrillRoot.java | 21 +-
.../drill/exec/service/ServiceEngine.java | 50 +-
.../src/main/resources/drill-module.conf | 8 +
.../java-exec/src/main/resources/rest/index.ftl | 21 +
.../drill/exec/rpc/data/TestBitBitKerberos.java | 326 ++++++-----
.../apache/drill/exec/rpc/data/TestBitRpc.java | 18 +-
.../drill/exec/rpc/security/KerberosHelper.java | 150 +++++
.../rpc/user/security/TestUserBitKerberos.java | 163 ++----
.../security/TestUserBitKerberosEncryption.java | 539 ++++++++++++++++++
.../exec/rpc/AbstractRemoteConnection.java | 108 +++-
.../org/apache/drill/exec/rpc/BasicClient.java | 15 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 16 +-
.../drill/exec/rpc/ChunkCreationHandler.java | 99 ++++
.../apache/drill/exec/rpc/ClientConnection.java | 5 +-
.../drill/exec/rpc/EncryptionContext.java | 37 ++
.../drill/exec/rpc/EncryptionContextImpl.java | 95 ++++
.../apache/drill/exec/rpc/RemoteConnection.java | 2 +
.../org/apache/drill/exec/rpc/RpcConstants.java | 27 +-
.../org/apache/drill/exec/rpc/RpcEncoder.java | 2 +-
.../org/apache/drill/exec/rpc/RpcMetrics.java | 29 +
.../org/apache/drill/exec/rpc/SaslCodec.java | 34 ++
.../drill/exec/rpc/SaslDecryptionHandler.java | 160 ++++++
.../drill/exec/rpc/SaslEncryptionHandler.java | 177 ++++++
.../apache/drill/exec/rpc/ServerConnection.java | 4 +-
pom.xml | 2 +
protocol/readme.txt | 16 +-
.../drill/exec/proto/SchemaUserProtos.java | 14 +
.../org/apache/drill/exec/proto/UserProtos.java | 547 ++++++++++++-------
.../exec/proto/beans/BitToUserHandshake.java | 44 ++
.../drill/exec/proto/beans/SaslSupport.java | 4 +-
protocol/src/main/protobuf/User.proto | 3 +
61 files changed, 3122 insertions(+), 673 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
index c7e6e29..75064e0 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common.config;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
@@ -59,6 +60,12 @@ public final class DrillProperties extends Properties {
public static final String KEYTAB = "keytab";
+ public static final String SASL_ENCRYPT = "sasl_encrypt";
+
+ // Should only be used for testing backward compatibility
+ @VisibleForTesting
+ public static final String TEST_SASL_LEVEL = "test_sasl_level";
+
// for subject that has pre-authenticated to KDC (AS) i.e. required credentials are populated in
// Subject's credentials set
public static final String KERBEROS_FROM_SUBJECT = "from_subject";
@@ -110,6 +117,15 @@ public final class DrillProperties extends Properties {
}
}
+ public void merge(final Map<String, String> overrides) {
+ if (overrides == null) {
+ return;
+ }
+ for (final String key : overrides.keySet()) {
+ setProperty(key.toLowerCase(), overrides.get(key));
+ }
+ }
+
/**
* Returns a map of keys and values in this property list where the key and its corresponding value are strings,
* including distinct keys in the default property list if a key of the same name has not already been found from
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index e291524..007e39a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -116,6 +116,11 @@ public interface ExecConstants {
String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
+ String USER_ENCRYPTION_SASL_ENABLED = "drill.exec.security.user.encryption.sasl.enabled";
+ String USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.user.encryption.sasl.max_wrapped_size";
+ String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";
+ String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size";
+
/** Size of JDBC batch queue (in batches) above which throttling begins. */
String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
index 055ea59..ab13c2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractClientConnection.java
@@ -30,8 +30,13 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
private SaslClient saslClient;
+ public AbstractClientConnection(SocketChannel channel, String name,
+ EncryptionContext encryptContext) {
+ super(channel, name, encryptContext);
+ }
+
public AbstractClientConnection(SocketChannel channel, String name) {
- super(channel, name);
+ this(channel, name, new EncryptionContextImpl());
}
protected abstract Logger getLogger();
@@ -40,6 +45,25 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
public void setSaslClient(final SaslClient saslClient) {
checkState(this.saslClient == null);
this.saslClient = saslClient;
+
+ // If encryption is enabled set the backend wrapper instance corresponding to this SaslClient in the connection
+ // object. This is later used to do wrap/unwrap in handlers.
+ if (isEncryptionEnabled()) {
+ saslCodec = new SaslCodec() {
+
+ @Override
+ public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+ checkState(saslClient != null);
+ return saslClient.wrap(data, offset, len);
+ }
+
+ @Override
+ public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+ checkState(saslClient != null);
+ return saslClient.unwrap(data, offset, len);
+ }
+ };
+ }
}
@Override
@@ -49,7 +73,7 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
}
@Override
- public void close() {
+ public void disposeSaslClient() {
try {
if (saslClient != null) {
saslClient.dispose();
@@ -58,6 +82,18 @@ public abstract class AbstractClientConnection extends AbstractRemoteConnection
} catch (final SaslException e) {
getLogger().warn("Unclean disposal", e);
}
- super.close();
+ }
+
+ @Override
+ public void channelClosed(RpcException ex) {
+ // This will be triggered from Netty when a channel is closed. We should cleanup here
+ // as this will handle case for both client closing the connection or server closing the
+ // connection.
+ disposeSaslClient();
+
+ // Decrease the connection counter here since the close handler will be triggered
+ // for all the types of connection
+ decConnectionCounter();
+ super.channelClosed(ex);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
index fb815ab..76c17e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractConnectionConfig.java
@@ -26,10 +26,12 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
private final BufferAllocator allocator;
private final BootStrapContext context;
+ protected EncryptionContext encryptionContext;
protected AbstractConnectionConfig(BufferAllocator allocator, BootStrapContext context) {
this.allocator = allocator;
this.context = context;
+ this.encryptionContext = new EncryptionContextImpl();
}
@Override
@@ -46,4 +48,13 @@ public abstract class AbstractConnectionConfig implements ConnectionConfig {
public AuthenticatorProvider getAuthProvider() {
return context.getAuthProvider();
}
+
+ @Override
+ public boolean isEncryptionEnabled() {
+ return encryptionContext.isEncryptionEnabled();
+ }
+
+ public EncryptionContext getEncryptionCtxt() {
+ return encryptionContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
new file mode 100644
index 0000000..a1fd308
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractRpcMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.codahale.metrics.Gauge;
+import io.netty.buffer.PooledByteBufAllocatorL;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+
+public abstract class AbstractRpcMetrics implements RpcMetrics {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRpcMetrics.class);
+
+ protected boolean useEncryptedCounter;
+
+ public static final String CONNECTION_COUNTER_PREFIX = "drill.connections.rpc.";
+
+ public static final String ALLOCATOR_METRICS_PREFIX = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
+
+ protected void registerAllocatorMetrics(final BufferAllocator allocator, final String metricPrefix) {
+ DrillMetrics.register(metricPrefix + "used", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return allocator.getAllocatedMemory();
+ }
+ });
+
+ DrillMetrics.register(metricPrefix + "peak", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return allocator.getPeakMemoryAllocation();
+ }
+ });
+ }
+
+ public abstract void initialize(boolean useEncryptedCounter, BufferAllocator allocator);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
index db87bfc..f10f6d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.socket.SocketChannel;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.security.SaslProperties;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -42,7 +43,7 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
public AbstractServerConnection(SocketChannel channel, String name, ConnectionConfig config,
RequestHandler<S> handler) {
- super(channel, name);
+ super(channel, name, config.getEncryptionCtxt());
this.config = config;
this.currentHandler = handler;
}
@@ -65,8 +66,8 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
try {
this.saslServer = config.getAuthProvider()
.getAuthenticatorFactory(mechanismName)
- .createSaslServer(UserGroupInformation.getLoginUser(), null
- /** properties; default QOP is auth */);
+ .createSaslServer(UserGroupInformation.getLoginUser(),
+ SaslProperties.getSaslProperties(isEncryptionEnabled(), getMaxWrappedSize()));
} catch (final IOException e) {
getLogger().debug("Login failed.", e);
final Throwable cause = e.getCause();
@@ -76,7 +77,27 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
throw new SaslException("Unexpected failure trying to login.", cause);
}
if (saslServer == null) {
- throw new SaslException("Server could not initiate authentication. Insufficient parameters?");
+ throw new SaslException(String.format("Server cannot initiate authentication using %s mechanism. Insufficient" +
+ " parameters or selected mechanism doesn't support configured security layers ?", mechanismName));
+ }
+
+ // If encryption is enabled set the backend wrapper instance corresponding to this SaslServer in the connection
+ // object. This is later used to do wrap/unwrap in handlers.
+ if (isEncryptionEnabled()) {
+ saslCodec = new SaslCodec() {
+
+ @Override
+ public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+ checkState(saslServer != null);
+ return saslServer.wrap(data, offset, len);
+ }
+
+ @Override
+ public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+ checkState(saslServer != null);
+ return saslServer.unwrap(data, offset, len);
+ }
+ };
}
}
@@ -110,7 +131,17 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
}
@Override
- public void close() {
+ public void setEncryption(boolean encrypted) {
+ throw new UnsupportedOperationException("Changing encryption setting on server connection is not permitted.");
+ }
+
+ @Override
+ public void setMaxWrappedSize(int maxWrappedSize) {
+ throw new UnsupportedOperationException("Changing maxWrappedSize setting on server connection is not permitted.");
+ }
+
+ @Override
+ public void disposeSaslServer() {
try {
if (saslServer != null) {
saslServer.dispose();
@@ -119,6 +150,18 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
} catch (final SaslException e) {
getLogger().warn("Unclean disposal.", e);
}
- super.close();
+ }
+
+ @Override
+ public void channelClosed(RpcException ex) {
+ // This will be triggered from Netty when a channel is closed. We should cleanup here
+ // as this will handle case for both client closing the connection or server closing the
+ // connection.
+ disposeSaslServer();
+
+ // Decrease the connection counter here since the close handler will be triggered
+ // for all the types of connection
+ decConnectionCounter();
+ super.channelClosed(ex);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
index 71e5a86..7d9ebec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.security.AuthStringUtil;
import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
@@ -46,16 +47,38 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
super(allocator, context);
final DrillConfig config = context.getConfig();
+ final AuthenticatorProvider authProvider = getAuthProvider();
+
if (config.getBoolean(ExecConstants.BIT_AUTHENTICATION_ENABLED)) {
this.authMechanismToUse = config.getString(ExecConstants.BIT_AUTHENTICATION_MECHANISM);
try {
- getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+ authProvider.getAuthenticatorFactory(authMechanismToUse);
} catch (final SaslException e) {
throw new DrillbitStartupException(String.format(
"'%s' mechanism not found for bit-to-bit authentication. Please check authentication configuration.",
authMechanismToUse));
}
- logger.info("Configured bit-to-bit connections to require authentication using: {}", authMechanismToUse);
+
+ // Update encryption related configurations
+ encryptionContext.setEncryption(config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED));
+ final int maxWrappedSize = config.getInt(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE);
+
+ if (maxWrappedSize <= 0) {
+ throw new DrillbitStartupException(String.format("Invalid value configured for " +
+ "bit.encryption.sasl.max_wrapped_size. Must be a positive integer in bytes with a recommended max value " +
+ "of %s", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE));
+ } else if (maxWrappedSize > RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE) {
+ logger.warn("The configured value of bit.encryption.sasl.max_wrapped_size is too big. This may cause higher" +
+ " memory pressure. [Details: Recommended max value is %s]", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
+ }
+ encryptionContext.setMaxWrappedSize(maxWrappedSize);
+
+ logger.info("Configured bit-to-bit connections to require authentication using: {} with encryption: {}",
+ authMechanismToUse, encryptionContext.getEncryptionCtxtString());
+
+ } else if (config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED)) {
+ throw new DrillbitStartupException("Invalid security configuration. Encryption using SASL is enabled with " +
+ "authentication disabled. Please check the security.bit configurations.");
} else {
this.authMechanismToUse = null;
}
@@ -78,7 +101,8 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
return getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
}
- public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint) throws IOException {
+ public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint,
+ final Map<String, String> overrides) throws IOException {
final DrillProperties properties = DrillProperties.createEmpty();
final UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
@@ -93,6 +117,8 @@ public abstract class BitConnectionConfig extends AbstractConnectionConfig {
properties.setProperty(DrillProperties.SERVICE_PRINCIPAL, loginPrincipal.toString());
}
}
+
+ properties.merge(overrides);
return properties.stringPropertiesAsMap();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
index 706b088..5b8a70b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ConnectionConfig.java
@@ -31,4 +31,7 @@ public interface ConnectionConfig {
AuthenticatorProvider getAuthProvider();
+ boolean isEncryptionEnabled();
+
+ EncryptionContext getEncryptionCtxt();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index 6ebe1c4..a46e968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.rpc.control;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
@@ -36,13 +38,14 @@ import org.apache.drill.exec.rpc.RpcCommand;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.FailingRequestHandler;
+import org.apache.drill.exec.rpc.security.SaslProperties;
-import com.google.protobuf.MessageLite;
import org.apache.hadoop.security.UserGroupInformation;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake> {
@@ -105,9 +108,12 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
final SaslClient saslClient;
try {
+ final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+ connection.getMaxWrappedSize());
+
saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
.createSaslClient(UserGroupInformation.getLoginUser(),
- config.getSaslClientProperties(remoteEndpoint));
+ config.getSaslClientProperties(remoteEndpoint, saslProperties));
} catch (final IOException e) {
throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
}
@@ -118,7 +124,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
} else {
if (config.getAuthMechanismToUse() != null) { // local requires authentication
throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
- remoteEndpoint.getAddress()));
+ remoteEndpoint.getAddress()));
}
}
}
@@ -126,6 +132,9 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@Override
protected void finalizeConnection(BitControlHandshake handshake, ControlConnection connection) {
connection.setEndpoint(handshake.getEndpoint());
+
+ // Increment the Control Connection counter.
+ connection.incConnectionCounter();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index a50a3b0..70189d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -18,15 +18,20 @@
package org.apache.drill.exec.rpc.control;
import com.google.protobuf.MessageLite;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.SocketChannel;
+
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.AbstractServerConnection;
import org.apache.drill.exec.rpc.ClientConnection;
import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.SaslCodec;
+
import org.slf4j.Logger;
import javax.security.sasl.SaslClient;
@@ -115,6 +120,24 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
public void setSaslClient(final SaslClient saslClient) {
checkState(this.saslClient == null);
this.saslClient = saslClient;
+
+ // If encryption is enabled set the backend wrapper instance corresponding to this SaslClient in the connection
+ // object. This is later used to do wrap/unwrap in handlers.
+ if (isEncryptionEnabled()) {
+ saslCodec = new SaslCodec() {
+ @Override
+ public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
+ assert saslClient != null;
+ return saslClient.wrap(data, offset, len);
+ }
+
+ @Override
+ public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
+ assert saslClient != null;
+ return saslClient.unwrap(data, offset, len);
+ }
+ };
+ }
}
@Override
@@ -124,7 +147,7 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
}
@Override
- public void close() {
+ public void disposeSaslClient() {
try {
if (saslClient != null) {
saslClient.dispose();
@@ -133,7 +156,25 @@ public class ControlConnection extends AbstractServerConnection<ControlConnectio
} catch (final SaslException e) {
getLogger().warn("Unclean disposal", e);
}
- super.close();
}
+ @Override
+ public void channelClosed(RpcException ex) {
+ // This will be triggered from Netty when a channel is closed. We should cleanup here
+ // as this will handle case for both client closing the connection or server closing the
+ // connection.
+ disposeSaslClient();
+
+ super.channelClosed(ex);
+ }
+
+ @Override
+ public void incConnectionCounter() {
+ ControlRpcMetrics.getInstance().addConnectionCount();
+ }
+
+ @Override
+ public void decConnectionCounter() {
+ ControlRpcMetrics.getInstance().decConnectionCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
new file mode 100644
index 0000000..ae9e7cc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcMetrics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit control rpc layer
+ */
+class ControlRpcMetrics extends AbstractRpcMetrics {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlRpcMetrics.class);
+
+ // Total number of control connection's as client and server for a DrillBit.
+ // i.e. Sum of incoming and outgoing control connections.
+ private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "control.encrypted");
+
+ private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "control.unencrypted");
+
+ private static final RpcMetrics INSTANCE = new ControlRpcMetrics();
+
+ // prevent instantiation
+ private ControlRpcMetrics() {
+ }
+
+ public static RpcMetrics getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Should only be called when first access to getInstance is made. In this case inside {@link ControllerImpl}.
+ * {@link ControlConnection} using the singleton instance should not call initialize.
+ *
+ * @param useEncryptedCounter
+ * @param allocator
+ */
+ @Override
+ public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+ this.useEncryptedCounter = useEncryptedCounter;
+ registerAllocatorMetrics(allocator);
+ }
+
+ @Override
+ public void addConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.inc();
+ } else {
+ unencryptedConnection.inc();
+ }
+ }
+
+ @Override
+ public void decConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.dec();
+ } else {
+ unencryptedConnection.dec();
+ }
+ }
+
+ private void registerAllocatorMetrics(final BufferAllocator allocator) {
+ registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.control.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 9e733df..09f6705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.rpc.control;
+import com.google.protobuf.MessageLite;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -30,8 +30,6 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
-import com.google.protobuf.MessageLite;
-
public class ControlServer extends BasicServer<RpcType, ControlConnection>{
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
@@ -105,6 +103,10 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
if (config.getAuthMechanismToUse() != null) {
builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
}
+
+ // Increase the Control Connection counter on server side
+ connection.incConnectionCounter();
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 2bf5ad3..7ce2e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -48,6 +48,9 @@ public class ControllerImpl implements Controller {
config = new ControlConnectionConfig(allocator, context, handler);
this.connectionRegistry = new ConnectionManagerRegistry(config);
this.handlerRegistry = handler.getHandlerRegistry();
+
+ // Initialize the singleton instance of ControlRpcMetrics.
+ ((ControlRpcMetrics)ControlRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index a37008d..603168d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -18,11 +18,11 @@
package org.apache.drill.exec.rpc.data;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -34,15 +34,15 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcCommand;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-
-import com.google.protobuf.MessageLite;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
+import org.apache.drill.exec.rpc.security.SaslProperties;
import org.apache.hadoop.security.UserGroupInformation;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> {
@@ -72,7 +72,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
@Override
protected DataClientConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- this.connection = new DataClientConnection(channel, this);
+ this.connection = new DataClientConnection(channel, this, config.getEncryptionCtxt());
return connection;
}
@@ -107,9 +107,13 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
final SaslClient saslClient;
try {
+
+ final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+ connection.getMaxWrappedSize());
+
saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
.createSaslClient(UserGroupInformation.getLoginUser(),
- config.getSaslClientProperties(remoteEndpoint));
+ config.getSaslClientProperties(remoteEndpoint, saslProperties));
} catch (final IOException e) {
throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
}
@@ -126,6 +130,11 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
}
@Override
+ protected void finalizeConnection(BitServerHandshake handshake, DataClientConnection connection) {
+ // Increment the Data Connection counter.
+ connection.incConnectionCounter();
+ }
+
protected <M extends MessageLite> RpcCommand<M, DataClientConnection>
getInitialCommand(final RpcCommand<M, DataClientConnection> command) {
final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command);
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index 625ab25..6ada2f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.rpc.AbstractClientConnection;
+import org.apache.drill.exec.rpc.EncryptionContext;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.protobuf.MessageLite;
@@ -37,8 +38,9 @@ public class DataClientConnection extends AbstractClientConnection {
private final DataClient client;
private final UUID id;
- public DataClientConnection(SocketChannel channel, DataClient client) {
- super(channel, "data client");
+ public DataClientConnection(SocketChannel channel, DataClient client,
+ EncryptionContext encryptionContextImpl) {
+ super(channel, "data client", encryptionContextImpl);
this.client = client;
this.id = UUID.randomUUID();
}
@@ -88,4 +90,14 @@ public class DataClientConnection extends AbstractClientConnection {
protected Logger getLogger() {
return logger;
}
+
+ @Override
+ public void incConnectionCounter() {
+ DataRpcMetrics.getInstance().addConnectionCount();
+ }
+
+ @Override
+ public void decConnectionCounter() {
+ DataRpcMetrics.getInstance().decConnectionCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 25c83b3..27b2250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -44,6 +44,9 @@ public class DataConnectionCreator implements AutoCloseable {
public DataConnectionCreator(BootStrapContext context, BufferAllocator allocator, WorkEventBus workBus,
WorkerBee bee) throws DrillbitStartupException {
config = new DataConnectionConfig(allocator, context, new DataServerRequestHandler(workBus, bee));
+
+ // Initialize the singleton instance of DataRpcMetrics.
+ ((DataRpcMetrics) DataRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
}
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) {
@@ -59,7 +62,7 @@ public class DataConnectionCreator implements AutoCloseable {
public DataTunnel getTunnel(DrillbitEndpoint endpoint) {
DataConnectionManager newManager = new DataConnectionManager(endpoint, config);
DataConnectionManager oldManager = connectionManager.putIfAbsent(endpoint, newManager);
- if(oldManager != null){
+ if (oldManager != null) {
newManager = oldManager;
}
return new DataTunnel(newManager);
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
new file mode 100644
index 0000000..997df57
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcMetrics.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit data rpc layer
+ */
+class DataRpcMetrics extends AbstractRpcMetrics {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataRpcMetrics.class);
+
+ // Total number of data connection's as client and server for a DrillBit.
+ // i.e. Sum of incoming and outgoing data connections.
+ private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "data.encrypted");
+
+ private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "data.unencrypted");
+
+ private static final RpcMetrics INSTANCE = new DataRpcMetrics();
+
+ // prevent instantiation
+ private DataRpcMetrics() {
+ }
+
+ public static RpcMetrics getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Should only be called when first access to getInstance is made. In this case inside {@link DataConnectionCreator}.
+ * {@link DataServerConnection} and {@link DataClientConnection} using the singleton instance should not call
+ * initialize.
+ *
+ * @param useEncryptedCounter
+ * @param allocator
+ */
+ @Override
+ public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+ this.useEncryptedCounter = useEncryptedCounter;
+ registerAllocatorMetrics(allocator);
+ }
+
+ @Override
+ public void addConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.inc();
+ } else {
+ unencryptedConnection.inc();
+ }
+ }
+
+ @Override
+ public void decConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.dec();
+ } else {
+ unencryptedConnection.dec();
+ }
+ }
+
+ private void registerAllocatorMetrics(final BufferAllocator allocator) {
+ registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.data.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 33270fd..9e31d6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.rpc.data;
+import com.google.protobuf.MessageLite;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -31,8 +31,6 @@ import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RpcException;
-import com.google.protobuf.MessageLite;
-
public class DataServer extends BasicServer<RpcType, DataServerConnection> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
@@ -84,6 +82,10 @@ public class DataServer extends BasicServer<RpcType, DataServerConnection> {
if (config.getAuthMechanismToUse() != null) {
builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
}
+
+ // Increase the Data Connection counter on server side.
+ connection.incConnectionCounter();
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
index 70e262f..41a4b1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
@@ -39,4 +39,13 @@ public class DataServerConnection extends AbstractServerConnection<DataServerCon
return logger;
}
+ @Override
+ public void incConnectionCounter() {
+ DataRpcMetrics.getInstance().addConnectionCount();
+ }
+
+ @Override
+ public void decConnectionCounter() {
+ DataRpcMetrics.getInstance().decConnectionCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
index 9c74ddc..7f51142 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.hadoop.security.UserGroupInformation;
+import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import java.io.IOException;
@@ -55,7 +56,8 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class);
- private static final ImmutableMap<SaslStatus, SaslChallengeProcessor> CHALLENGE_PROCESSORS;
+ private static final ImmutableMap<SaslStatus, SaslChallengeProcessor>
+ CHALLENGE_PROCESSORS;
static {
final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class);
map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
@@ -99,7 +101,7 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
.setData(responseData)
.build(),
SaslMessage.class,
- true /** the connection will not be backed up at this point */);
+ true /* the connection will not be backed up at this point */);
logger.trace("Initiated SASL exchange.");
} catch (final Exception e) {
completionListener.failed(RpcException.mapException(e));
@@ -120,19 +122,24 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
new SaslException("Server sent a corrupt message.")));
} else {
try {
- final SaslChallengeContext context = new SaslChallengeContext(value, connection.getSaslClient(), ugi);
-
+ final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection);
final SaslMessage saslResponse = processor.process(context);
if (saslResponse != null) {
client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener),
connection, saslRpcType, saslResponse, SaslMessage.class,
- true /** the connection will not be backed up at this point */);
+ true /* the connection will not be backed up at this point */);
} else {
// success
completionListener.success(null, null);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}",
+ connection.getSaslClient().getMechanismName(), connection.getEncryptionCtxtString());
+ }
}
} catch (final Exception e) {
+ logger.error("Authentication with encryption context: {} using mechanism {} failed with {}",
+ connection.getEncryptionCtxtString(), connection.getSaslClient().getMechanismName(), e.getMessage());
completionListener.failed(RpcException.mapException(e));
}
}
@@ -143,16 +150,16 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
completionListener.interrupted(e);
}
- private static class SaslChallengeContext {
+ private static class SaslChallengeContext<C extends ClientConnection> {
final SaslMessage challenge;
- final SaslClient saslClient;
final UserGroupInformation ugi;
+ final C connection;
- SaslChallengeContext(SaslMessage challenge, SaslClient saslClient, UserGroupInformation ugi) {
+ SaslChallengeContext(SaslMessage challenge, UserGroupInformation ugi, C connection) {
this.challenge = checkNotNull(challenge);
- this.saslClient = checkNotNull(saslClient);
this.ugi = checkNotNull(ugi);
+ this.connection = checkNotNull(connection);
}
}
@@ -165,22 +172,24 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
*
* @param context challenge context
* @return response
- * @throws Exception
+ * @throws Exception in case of any failure
*/
- SaslMessage process(SaslChallengeContext context) throws Exception;
+ <CC extends ClientConnection>
+ SaslMessage process(SaslChallengeContext<CC> context) throws Exception;
}
private static class SaslInProgressProcessor implements SaslChallengeProcessor {
@Override
- public SaslMessage process(SaslChallengeContext context) throws Exception {
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
final SaslMessage.Builder response = SaslMessage.newBuilder();
+ final SaslClient saslClient = context.connection.getSaslClient();
- final byte[] responseBytes = evaluateChallenge(context.ugi, context.saslClient,
+ final byte[] responseBytes = evaluateChallenge(context.ugi, saslClient,
context.challenge.getData().toByteArray());
- final boolean isComplete = context.saslClient.isComplete();
+ final boolean isComplete = saslClient.isComplete();
logger.trace("Evaluated challenge. Completed? {}.", isComplete);
response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY);
// if isComplete, the client will get one more response from server
@@ -192,20 +201,18 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
private static class SaslSuccessProcessor implements SaslChallengeProcessor {
@Override
- public SaslMessage process(SaslChallengeContext context) throws Exception {
- if (context.saslClient.isComplete()) {
- logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
- // setup security layers here..
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+ final SaslClient saslClient = context.connection.getSaslClient();
+
+ if (saslClient.isComplete()) {
+ handleSuccess(context);
return null;
} else {
-
// server completed before client; so try once, fail otherwise
- evaluateChallenge(context.ugi, context.saslClient,
- context.challenge.getData().toByteArray()); // discard response
+ evaluateChallenge(context.ugi, saslClient, context.challenge.getData().toByteArray()); // discard response
- if (context.saslClient.isComplete()) {
- logger.trace("Successfully authenticated to server using {}", context.saslClient.getMechanismName());
- // setup security layers here..
+ if (saslClient.isComplete()) {
+ handleSuccess(context);
return null;
} else {
throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?");
@@ -217,8 +224,9 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
private static class SaslFailedProcessor implements SaslChallengeProcessor {
@Override
- public SaslMessage process(SaslChallengeContext context) throws Exception {
- throw new SaslException("Authentication failed. Incorrect credentials?");
+ public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception {
+ throw new SaslException(String.format("Authentication failed. Incorrect credentials? [Details: %s]",
+ context.connection.getEncryptionCtxtString()));
}
}
@@ -243,4 +251,48 @@ public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientC
}
}
}
+
+
+ private static <CC extends ClientConnection> void handleSuccess(SaslChallengeContext<CC> context) throws
+ SaslException {
+ final CC connection = context.connection;
+ final SaslClient saslClient = connection.getSaslClient();
+
+ try {
+ // Check if connection was marked for being secure then verify for negotiated QOP value for
+ // correctness.
+ final String negotiatedQOP = saslClient.getNegotiatedProperty(Sasl.QOP).toString();
+ final String expectedQOP = connection.isEncryptionEnabled()
+ ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+ : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+ if (!(negotiatedQOP.equals(expectedQOP))) {
+ throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+ negotiatedQOP, expectedQOP));
+ }
+
+ // Update the rawWrapChunkSize with the negotiated buffer size since we cannot call encode with more than
+ // negotiated size of buffer.
+ if (connection.isEncryptionEnabled()) {
+ final int negotiatedRawSendSize = Integer.parseInt(
+ saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+ if (negotiatedRawSendSize <= 0) {
+ throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+ "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+ negotiatedRawSendSize));
+ }
+ connection.setWrapSizeLimit(negotiatedRawSendSize);
+ }
+ } catch (Exception e) {
+ throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+ e.getMessage()), e);
+ }
+
+ if (connection.isEncryptionEnabled()) {
+ connection.addSecurityHandlers();
+ } else {
+ // Encryption is not required hence we don't need to hold on to saslClient object.
+ connection.disposeSaslClient();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
index f4c60e7..cfb9512 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorProviderImpl.java
@@ -137,5 +137,4 @@ public class AuthenticatorProviderImpl implements AuthenticatorProvider {
AutoCloseables.close(authFactories.values());
authFactories.clear();
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
index bdcbcf5..5cac208 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ClientAuthenticatorProvider.java
@@ -48,7 +48,7 @@ public class ClientAuthenticatorProvider implements AuthenticatorProvider {
// Mapping: simple name -> authenticator factory
private final Map<String, AuthenticatorFactory> authFactories = CaseInsensitiveMap.newHashMapWithExpectedSize(5);
- public ClientAuthenticatorProvider() {
+ private ClientAuthenticatorProvider() {
// factories provided by Drill
final KerberosFactory kerberosFactory = new KerberosFactory();
authFactories.put(kerberosFactory.getSimpleName(), kerberosFactory);
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
new file mode 100644
index 0000000..9ed85ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+import javax.security.sasl.Sasl;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class SaslProperties {
+
+ /**
+ * All supported Quality of Protection values which can be negotiated
+ */
+ enum QualityOfProtection {
+ AUTHENTICATION("auth"),
+ INTEGRITY("auth-int"),
+ PRIVACY("auth-conf");
+
+ public final String saslQop;
+
+ QualityOfProtection(String saslQop) {
+ this.saslQop = saslQop;
+ }
+
+ public String getSaslQop() {
+ return saslQop;
+ }
+ }
+
+ /**
+ * Get's the map of minimum set of SaslProperties required during negotiation process either for encryption
+ * or authentication
+ * @param encryptionEnabled - Flag to determine if property needed is for encryption or authentication
+ * @param wrappedChunkSize - Configured wrappedChunkSize to negotiate for.
+ * @return Map of SaslProperties which will be used in negotiation.
+ */
+ public static Map<String, String> getSaslProperties(boolean encryptionEnabled, int wrappedChunkSize) {
+ Map<String, String> saslProps = new HashMap<>();
+
+ if (encryptionEnabled) {
+ saslProps.put(Sasl.STRENGTH, "high");
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.MAX_BUFFER, Integer.toString(wrappedChunkSize));
+ saslProps.put(Sasl.POLICY_NOPLAINTEXT, "true");
+ } else {
+ saslProps.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.getSaslQop());
+ }
+
+ return saslProps;
+ }
+
+ private SaslProperties() {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
index bf34d57..ddd216f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.ServerConnection;
import org.apache.hadoop.security.UserGroupInformation;
+import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
@@ -87,7 +88,7 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
try {
saslResponse = SaslMessage.PARSER.parseFrom(new ByteBufInputStream(pBody));
} catch (final InvalidProtocolBufferException e) {
- handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+ handleAuthFailure(connection, sender, e, saslResponseType);
return;
}
@@ -95,17 +96,17 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
final SaslResponseProcessor processor = RESPONSE_PROCESSORS.get(saslResponse.getStatus());
if (processor == null) {
logger.info("Unknown message type from client from {}. Will stop authentication.", remoteAddress);
- handleAuthFailure(remoteAddress, sender, new SaslException("Received unexpected message"),
+ handleAuthFailure(connection, sender, new SaslException("Received unexpected message"),
saslResponseType);
return;
}
- final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, remoteAddress,
- sender, requestHandler, saslResponseType);
+ final SaslResponseContext<S, T> context = new SaslResponseContext<>(saslResponse, connection, sender,
+ requestHandler, saslResponseType);
try {
processor.process(context);
} catch (final Exception e) {
- handleAuthFailure(remoteAddress, sender, e, saslResponseType);
+ handleAuthFailure(connection, sender, e, saslResponseType);
}
} else {
@@ -115,9 +116,9 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
// but the client should not be making any requests before authenticating.
// drop connection
throw new RpcException(
- String.format("Request of type %d is not allowed without authentication. " +
- "Client on %s must authenticate before making requests. Connection dropped.",
- rpcType, remoteAddress));
+ String.format("Request of type %d is not allowed without authentication. Client on %s must authenticate " +
+ "before making requests. Connection dropped. [Details: %s]",
+ rpcType, remoteAddress, connection.getEncryptionCtxtString()));
}
}
@@ -125,16 +126,14 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
final SaslMessage saslResponse;
final S connection;
- final String remoteAddress;
final ResponseSender sender;
final RequestHandler<S> requestHandler;
final T saslResponseType;
- SaslResponseContext(SaslMessage saslResponse, S connection, String remoteAddress, ResponseSender sender,
+ SaslResponseContext(SaslMessage saslResponse, S connection, ResponseSender sender,
RequestHandler<S> requestHandler, T saslResponseType) {
this.saslResponse = checkNotNull(saslResponse);
this.connection = checkNotNull(connection);
- this.remoteAddress = checkNotNull(remoteAddress);
this.sender = checkNotNull(sender);
this.requestHandler = checkNotNull(requestHandler);
this.saslResponseType = checkNotNull(saslResponseType);
@@ -208,8 +207,11 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
handleSuccess(context, challenge, saslServer);
} else {
- logger.info("Failed to authenticate client from {}", context.remoteAddress);
- throw new SaslException("Client allegedly succeeded authentication, but server did not. Suspicious?");
+ final S connection = context.connection;
+ logger.info("Failed to authenticate client from {} with encryption context:{}",
+ connection.getRemoteAddress().toString(), connection.getEncryptionCtxtString());
+ throw new SaslException(String.format("Client allegedly succeeded authentication but server did not. " +
+ "Suspicious? [Details: %s]", connection.getEncryptionCtxtString()));
}
}
}
@@ -219,9 +221,11 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
@Override
public <S extends ServerConnection<S>, T extends EnumLite>
void process(SaslResponseContext<S, T> context) throws Exception {
- logger.info("Client from {} failed authentication graciously, and does not want to continue.",
- context.remoteAddress);
- throw new SaslException("Client graciously failed authentication");
+ final S connection = context.connection;
+ logger.info("Client from {} failed authentication with encryption context:{} graciously, and does not want to " +
+ "continue.", connection.getRemoteAddress().toString(), connection.getEncryptionCtxtString());
+ throw new SaslException(String.format("Client graciously failed authentication. [Details: %s]",
+ connection.getEncryptionCtxtString()));
}
}
@@ -251,25 +255,67 @@ public class ServerAuthenticationHandler<S extends ServerConnection<S>, T extend
private static <S extends ServerConnection<S>, T extends EnumLite>
void handleSuccess(final SaslResponseContext<S, T> context, final SaslMessage.Builder challenge,
final SaslServer saslServer) throws IOException {
- context.connection.changeHandlerTo(context.requestHandler);
- context.connection.finalizeSaslSession();
- context.sender.send(new Response(context.saslResponseType, challenge.build()));
- // setup security layers here..
+ final S connection = context.connection;
+ connection.changeHandlerTo(context.requestHandler);
+ connection.finalizeSaslSession();
+
+ // Check the negotiated property before sending the response back to client
+ try {
+ final String negotiatedQOP = saslServer.getNegotiatedProperty(Sasl.QOP).toString();
+ final String expectedQOP = (connection.isEncryptionEnabled())
+ ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop()
+ : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop();
+
+ if (!(negotiatedQOP.equals(expectedQOP))) {
+ throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s",
+ negotiatedQOP, expectedQOP));
+ }
+
+ // Update the rawWrapSendSize with the negotiated rawSendSize since we cannot call encode with more than the
+ // negotiated size of buffer
+ if (connection.isEncryptionEnabled()) {
+ final int negotiatedRawSendSize = Integer.parseInt(
+ saslServer.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString());
+ if (negotiatedRawSendSize <= 0) {
+ throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " +
+ "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.",
+ negotiatedRawSendSize));
+ }
+ connection.setWrapSizeLimit(negotiatedRawSendSize);
+ }
+ } catch (IllegalStateException | NumberFormatException e) {
+ throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)",
+ e.getMessage()), e);
+ }
if (logger.isTraceEnabled()) {
- logger.trace("Authenticated {} successfully using {} from {}", saslServer.getAuthorizationID(),
- saslServer.getMechanismName(), context.remoteAddress);
+ logger.trace("Authenticated {} successfully using {} from {} with encryption context {}",
+ saslServer.getAuthorizationID(), saslServer.getMechanismName(), connection.getRemoteAddress().toString(),
+ connection.getEncryptionCtxtString());
+ }
+
+ // All checks have passed let's send the response back to client before adding handlers.
+ context.sender.send(new Response(context.saslResponseType, challenge.build()));
+
+ if (connection.isEncryptionEnabled()) {
+ connection.addSecurityHandlers();
+ } else {
+ // Encryption is not required hence we don't need to hold on to saslServer object.
+ connection.disposeSaslServer();
}
}
private static final SaslMessage SASL_FAILED_MESSAGE =
SaslMessage.newBuilder().setStatus(SaslStatus.SASL_FAILED).build();
- private static <T extends EnumLite>
- void handleAuthFailure(final String remoteAddress, final ResponseSender sender,
+ private static <S extends ServerConnection<S>, T extends EnumLite>
+ void handleAuthFailure(final S connection, final ResponseSender sender,
final Exception e, final T saslResponseType) throws RpcException {
- logger.debug("Authentication failed from client {} due to {}", remoteAddress, e);
+ final String remoteAddress = connection.getRemoteAddress().toString();
+
+ logger.debug("Authentication using mechanism {} with encryption context {} failed from client {} due to {}",
+ connection.getSaslServer().getMechanismName(), connection.getEncryptionCtxtString(), remoteAddress, e);
// inform the client that authentication failed, and no more
sender.send(new Response(saslResponseType, SASL_FAILED_MESSAGE));
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
index 855dd8b..e14d411 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/kerberos/KerberosFactory.java
@@ -33,6 +33,7 @@ import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.LoginException;
import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
@@ -93,6 +94,7 @@ public class KerberosFactory implements AuthenticatorFactory {
@Override
public SaslServer createSaslServer(final UserGroupInformation ugi, final Map<String, ?> properties)
throws SaslException {
+ final String qopValue = properties.containsKey(Sasl.QOP) ? properties.get(Sasl.QOP).toString() : "auth";
try {
final String primaryName = ugi.getShortUserName();
final String instanceName = new HadoopKerberosName(ugi.getUserName()).getHostName();
@@ -105,7 +107,7 @@ public class KerberosFactory implements AuthenticatorFactory {
new KerberosServerCallbackHandler());
}
});
- logger.trace("GSSAPI SaslServer created.");
+ logger.trace("GSSAPI SaslServer created with QOP {}.", qopValue);
return saslServer;
} catch (final UndeclaredThrowableException e) {
final Throwable cause = e.getCause();
@@ -113,11 +115,13 @@ public class KerberosFactory implements AuthenticatorFactory {
if (cause instanceof SaslException) {
throw (SaslException) cause;
} else {
- throw new SaslException("Unexpected failure trying to authenticate using Kerberos", cause);
+ throw new SaslException(String.format("Unexpected failure trying to authenticate using Kerberos with QOP %s",
+ qopValue), cause);
}
} catch (final IOException | InterruptedException e) {
logger.debug("Authentication failed.", e);
- throw new SaslException("Unexpected failure trying to authenticate using Kerberos", e);
+ throw new SaslException(String.format("Unexpected failure trying to authenticate using Kerberos with QOP %s",
+ qopValue), e);
}
}
@@ -129,6 +133,8 @@ public class KerberosFactory implements AuthenticatorFactory {
final String parts[] = KerberosUtil.splitPrincipalIntoParts(servicePrincipal);
final String serviceName = parts[0];
final String serviceHostName = parts[1];
+ final String qopValue = properties.containsKey(Sasl.QOP) ? properties.get(Sasl.QOP).toString() : "auth";
+
// ignore parts[2]; GSSAPI gets the realm info from the ticket
try {
final SaslClient saslClient = ugi.doAs(new PrivilegedExceptionAction<SaslClient>() {
@@ -146,20 +152,20 @@ public class KerberosFactory implements AuthenticatorFactory {
});
}
});
- logger.debug("GSSAPI SaslClient created to authenticate to {} running on {}",
- serviceName, serviceHostName);
+ logger.debug("GSSAPI SaslClient created to authenticate to {} running on {} with QOP value {}",
+ serviceName, serviceHostName, qopValue);
return saslClient;
} catch (final UndeclaredThrowableException e) {
logger.debug("Authentication failed.", e);
- throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
- serviceHostName), e.getCause());
+ throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI with QOP %s",
+ serviceHostName, qopValue), e.getCause());
} catch (final IOException | InterruptedException e) {
logger.debug("Authentication failed.", e);
if (e instanceof SaslException) {
throw (SaslException) e;
}
- throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI",
- serviceHostName), e);
+ throw new SaslException(String.format("Unexpected failure trying to authenticate to %s using GSSAPI with QOP %s",
+ serviceHostName, qopValue), e);
}
}