You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/05/20 23:52:44 UTC
[3/6] drill git commit: DRILL-4335: Apache Drill should support
network encryption.
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index a7ea7b7..2f47538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -69,6 +69,7 @@ import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
+import org.apache.drill.exec.rpc.security.SaslProperties;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -81,6 +82,7 @@ import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.MessageLite;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
@@ -137,18 +139,32 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
*/
public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties,
final UserCredentials credentials) throws RpcException {
- final UserToBitHandshake handshake = UserToBitHandshake.newBuilder()
+ final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
.setSupportComplexTypes(supportComplexTypes)
.setSupportTimeout(true)
.setCredentials(credentials)
.setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
- .setSaslSupport(SaslSupport.SASL_AUTH)
- .setProperties(properties.serializeForServer())
- .build();
+ .setSaslSupport(SaslSupport.SASL_PRIVACY)
+ .setProperties(properties.serializeForServer());
+
+ // Only used for testing purpose
+ if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) {
+ hsBuilder.setSaslSupport(SaslSupport.valueOf(
+ Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
+ }
+
+ connect(hsBuilder.build(), endpoint).checkedGet();
- connect(handshake, endpoint).checkedGet();
+ // Check if client needs encryption and server is not configured for encryption.
+ final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT)
+ && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
+
+ if(clientNeedsEncryption && !connection.isEncryptionEnabled()) {
+ throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " +
+ "encryption. Please check connection parameter or contact your administrator");
+ }
if (serverAuthMechanisms != null) {
try {
@@ -192,6 +208,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
private CheckedFuture<Void, SaslException> authenticate(final DrillProperties properties) {
final Map<String, String> propertiesMap = properties.stringPropertiesAsMap();
+ // Set correct QOP property and Strength based on server needs encryption or not.
+ // If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize,
+ // If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size.
+ propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
+ connection.getMaxWrappedSize()));
+
final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException
final CheckedFuture<Void, SaslException> authFuture =
new AbstractCheckedFuture<Void, SaslException>(authSettable) {
@@ -201,10 +223,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
if (e instanceof ExecutionException) {
final Throwable cause = Throwables.getRootCause(e);
if (cause instanceof SaslException) {
- return new SaslException("Authentication failed: " + cause.getMessage(), cause);
+ return new SaslException(String.format("Authentication failed. [Details: %s, Error %s]",
+ connection.getEncryptionCtxtString(), cause.getMessage()), cause);
}
}
- return new SaslException("Authentication failed unexpectedly.", e);
+ return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]",
+ connection.getEncryptionCtxtString(), e.getMessage()), e);
}
};
@@ -215,11 +239,13 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
try {
factory = getAuthenticatorFactory(properties);
mechanismName = factory.getSimpleName();
- logger.trace("Will try to authenticate to server using {} mechanism.", mechanismName);
+ logger.trace("Will try to authenticate to server using {} mechanism with encryption context {}",
+ mechanismName, connection.getEncryptionCtxtString());
ugi = factory.createAndLoginUser(propertiesMap);
saslClient = factory.createSaslClient(ugi, propertiesMap);
if (saslClient == null) {
- throw new SaslException("Cannot initiate authentication. Insufficient credentials?");
+ throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " +
+ "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName()));
}
connection.setSaslClient(saslClient);
} catch (final IOException e) {
@@ -255,13 +281,12 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
// first, check if a certain mechanism must be used
String authMechanism = properties.getProperty(DrillProperties.AUTH_MECHANISM);
if (authMechanism != null) {
- if (!ClientAuthenticatorProvider.getInstance()
- .containsFactory(authMechanism)) {
+ if (!ClientAuthenticatorProvider.getInstance().containsFactory(authMechanism)) {
throw new SaslException(String.format("Unknown mechanism: %s", authMechanism));
}
if (!mechanismSet.contains(authMechanism.toUpperCase())) {
- throw new SaslException(String.format("Server does not support authentication using: %s",
- authMechanism));
+ throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]",
+ authMechanism, connection.getEncryptionCtxtString()));
}
return ClientAuthenticatorProvider.getInstance()
.getAuthenticatorFactory(authMechanism);
@@ -282,8 +307,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
.getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
}
- throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?",
- serverAuthMechanisms));
+ throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " +
+ "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString()));
}
protected <SEND extends MessageLite, RECEIVE extends MessageLite>
@@ -331,8 +356,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
if (!authComplete) {
// Remote should not be making any requests before authenticating, drop connection
throw new RpcException(String.format("Request of type %d is not allowed without authentication. " +
- "Remote on %s must authenticate before making requests. Connection dropped.",
- rpcType, connection.getRemoteAddress()));
+ "Remote on %s must authenticate before making requests. Connection dropped.",
+ rpcType, connection.getRemoteAddress()));
}
switch (rpcType) {
case RpcType.QUERY_DATA_VALUE:
@@ -361,8 +386,14 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
break;
case AUTH_REQUIRED: {
authComplete = false;
- logger.trace("Server requires authentication before proceeding.");
serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
+ connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
+
+ if (inbound.hasMaxWrappedSize()) {
+ connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+ }
+ logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.",
+ connection.getEncryptionCtxtString()));
break;
}
case AUTH_FAILED:
@@ -384,6 +415,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
public class UserToBitConnection extends AbstractClientConnection {
UserToBitConnection(SocketChannel channel) {
+
+ // by default connection is not set for encryption. After receiving handshake msg from server we set the
+ // isEncryptionEnabled, useChunkMode and chunkModeSize correctly.
super(channel, "user client");
}
@@ -396,6 +430,16 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
protected Logger getLogger() {
return logger;
}
+
+ @Override
+ public void incConnectionCounter() {
+ // no-op
+ }
+
+ @Override
+ public void decConnectionCounter() {
+ // no-op
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 49a866b..64ac6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -17,11 +17,14 @@
*/
package org.apache.drill.exec.rpc.user;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.AbstractConnectionConfig;
import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.security.AuthenticatorProvider;
import org.apache.drill.exec.server.BootStrapContext;
// config for bit to user connection
@@ -35,24 +38,46 @@ class UserConnectionConfig extends AbstractConnectionConfig {
private final UserServerRequestHandler handler;
UserConnectionConfig(BufferAllocator allocator, BootStrapContext context, UserServerRequestHandler handler)
- throws DrillbitStartupException {
+ throws DrillbitStartupException {
super(allocator, context);
this.handler = handler;
- if (context.getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
- if (getAuthProvider().getAllFactoryNames().isEmpty()) {
+ final DrillConfig config = context.getConfig();
+ final AuthenticatorProvider authProvider = getAuthProvider();
+
+ if (config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED)) {
+ if (authProvider.getAllFactoryNames().isEmpty()) {
throw new DrillbitStartupException("Authentication enabled, but no mechanisms found. Please check " +
"authentication configuration.");
}
authEnabled = true;
- logger.info("Configured all user connections to require authentication using: {}",
- getAuthProvider().getAllFactoryNames());
+
+ // Update encryption related parameters.
+ encryptionContext.setEncryption(config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED));
+ final int maxWrappedSize = config.getInt(ExecConstants.USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE);
+
+ if (maxWrappedSize <= 0) {
+ throw new DrillbitStartupException(String.format("Invalid value configured for " +
+ "user.encryption.sasl.max_wrapped_size. Must be a positive integer in bytes with a recommended max value " +
+ "of %s", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE));
+ } else if (maxWrappedSize > RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE) {
+ logger.warn("The configured value of user.encryption.sasl.max_wrapped_size is too big. This may cause higher" +
+ " memory pressure. [Details: Recommended max value is %s]", RpcConstants.MAX_RECOMMENDED_WRAPPED_SIZE);
+ }
+ encryptionContext.setMaxWrappedSize(maxWrappedSize);
+
+ logger.info("Configured all user connections to require authentication with encryption: {} using: {}",
+ encryptionContext.getEncryptionCtxtString(), authProvider.getAllFactoryNames());
+ } else if (config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED)) {
+ throw new DrillbitStartupException("Invalid security configuration. Encryption using SASL is enabled with " +
+ "authentication disabled. Please check the security.user configurations.");
} else {
authEnabled = false;
}
- impersonationManager = !context.getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED) ? null :
- new InboundImpersonationManager();
+ impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
+ ? null
+ : new InboundImpersonationManager();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
new file mode 100644
index 0000000..ab93e3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcMetrics.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.rpc.AbstractRpcMetrics;
+import org.apache.drill.exec.rpc.RpcMetrics;
+
+/**
+ * Holds metrics related to bit user rpc layer
+ */
+class UserRpcMetrics extends AbstractRpcMetrics {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcMetrics.class);
+
+ // Total number of user client connection's to a DrillBit.
+ private static final Counter encryptedConnections = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "user.encrypted");
+
+ private static final Counter unencryptedConnection = DrillMetrics.getRegistry()
+ .counter(CONNECTION_COUNTER_PREFIX + "user.unencrypted");
+
+ private static final RpcMetrics INSTANCE = new UserRpcMetrics();
+
+ // prevent instantiation
+ private UserRpcMetrics() {
+ }
+
+ public static RpcMetrics getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Should only be called when first access to getInstance is made. In this case inside {@link UserServer}.
+ * BitToUserConnection using the singleton instance should not call initialize.
+ *
+ * @param useEncryptedCounter
+ * @param allocator
+ */
+ @Override
+ public void initialize(boolean useEncryptedCounter, BufferAllocator allocator) {
+ this.useEncryptedCounter = useEncryptedCounter;
+ registerAllocatorMetrics(allocator);
+ }
+
+
+ @Override
+ public void addConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.inc();
+ } else {
+ unencryptedConnection.inc();
+ }
+ }
+
+ @Override
+ public void decConnectionCount() {
+ if (useEncryptedCounter) {
+ encryptedConnections.dec();
+ } else {
+ unencryptedConnection.dec();
+ }
+ }
+
+ private void registerAllocatorMetrics(final BufferAllocator allocator) {
+ registerAllocatorMetrics(allocator, ALLOCATOR_METRICS_PREFIX + "bit.user.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 9f0d502..543145f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.OutboundRpcMessage;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
+import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
@@ -78,6 +79,9 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
eventLoopGroup);
this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
this.userWorker = worker;
+
+ // Initialize Singleton instance of UserRpcMetrics.
+ ((UserRpcMetrics)UserRpcMetrics.getInstance()).initialize(config.isEncryptionEnabled(), allocator);
}
@Override
@@ -149,7 +153,7 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
void disableReadTimeout() {
- getChannel().pipeline().remove(BasicServer.TIMEOUT_HANDLER);
+ getChannel().pipeline().remove(RpcConstants.TIMEOUT_HANDLER);
}
void setHandshake(final UserToBitHandshake inbound) {
@@ -186,6 +190,10 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
if (config.getImpersonationManager() != null && targetName != null) {
config.getImpersonationManager().replaceUserOnSession(targetName, session);
}
+
+ // Increase the corresponding connection counter.
+ // For older clients we call this method directly.
+ incConnectionCounter();
}
@Override
@@ -237,6 +245,16 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
cleanup();
super.close();
}
+
+ @Override
+ public void incConnectionCounter() {
+ UserRpcMetrics.getInstance().addConnectionCount();
+ }
+
+ @Override
+ public void decConnectionCounter() {
+ UserRpcMetrics.getInstance().decConnectionCount();
+ }
}
@Override
@@ -295,7 +313,16 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
final boolean clientSupportsSasl = inbound.hasSaslSupport() &&
- (inbound.getSaslSupport().ordinal() >= SaslSupport.SASL_AUTH.ordinal());
+ (inbound.getSaslSupport().ordinal() > SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal());
+
+ final int saslSupportOrdinal = (clientSupportsSasl) ? inbound.getSaslSupport().ordinal()
+ : SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal();
+
+ if (saslSupportOrdinal <= SaslSupport.SASL_AUTH.ordinal() && config.isEncryptionEnabled()) {
+ throw new UserAuthenticationException("The server doesn't allow client without encryption support." +
+ " Please upgrade your client or talk to your system administrator.");
+ }
+
if (!clientSupportsSasl) { // for backward compatibility < 1.10
final String userName = inbound.getCredentials().getUserName();
if (logger.isTraceEnabled()) {
@@ -335,9 +362,14 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
}
- // mention server's authentication capabilities
+ // Offer all the configured mechanisms to client. If certain mechanism doesn't support encryption
+ // like PLAIN, those should fail during the SASL handshake negotiation.
respBuilder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
+ // set the encrypted flag in handshake message. For older clients this field is optional so will be ignored
+ respBuilder.setEncrypted(connection.isEncryptionEnabled());
+ respBuilder.setMaxWrappedSize(connection.getMaxWrappedSize());
+
// for now, this means PLAIN credentials will be sent over twice
// (during handshake and during sasl exchange)
respBuilder.setStatus(HandshakeStatus.AUTH_REQUIRED);
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
index c7a1338..a79c1df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/security/UserAuthenticatorFactory.java
@@ -46,6 +46,12 @@ public class UserAuthenticatorFactory {
*/
public static UserAuthenticator createAuthenticator(final DrillConfig config, ScanResult scan)
throws DrillbitStartupException {
+
+ if(!config.hasPath(USER_AUTHENTICATOR_IMPL)) {
+ throw new DrillbitStartupException(String.format("BOOT option '%s' is missing in config.",
+ USER_AUTHENTICATOR_IMPL));
+ }
+
final String authImplConfigured = config.getString(USER_AUTHENTICATOR_IMPL);
if (Strings.isNullOrEmpty(authImplConfigured)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index ba0f212..84c471e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -29,6 +29,8 @@ import javax.xml.bind.annotation.XmlRootElement;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
import org.apache.drill.exec.work.WorkManager;
@@ -61,6 +63,10 @@ public class DrillRoot {
final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint();
final String currentVersion = currentDrillbit.getVersion();
+ final DrillConfig config = work.getContext().getConfig();
+ final boolean userEncryptionEnabled = config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED);
+ final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
+
for (DrillbitEndpoint endpoint : work.getContext().getBits()) {
final DrillbitInfo drillbit = new DrillbitInfo(endpoint,
currentDrillbit.equals(endpoint),
@@ -71,7 +77,8 @@ public class DrillRoot {
drillbits.add(drillbit);
}
- return new ClusterInfo(drillbits, currentVersion, mismatchedVersions);
+ return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
+ userEncryptionEnabled, bitEncryptionEnabled);
}
@XmlRootElement
@@ -79,14 +86,20 @@ public class DrillRoot {
private final Collection<DrillbitInfo> drillbits;
private final String currentVersion;
private final Collection<String> mismatchedVersions;
+ private final boolean userEncryptionEnabled;
+ private final boolean bitEncryptionEnabled;
@JsonCreator
public ClusterInfo(Collection<DrillbitInfo> drillbits,
String currentVersion,
- Collection<String> mismatchedVersions) {
+ Collection<String> mismatchedVersions,
+ boolean userEncryption,
+ boolean bitEncryption) {
this.drillbits = Sets.newTreeSet(drillbits);
this.currentVersion = currentVersion;
this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
+ this.userEncryptionEnabled = userEncryption;
+ this.bitEncryptionEnabled = bitEncryption;
}
public Collection<DrillbitInfo> getDrillbits() {
@@ -100,6 +113,10 @@ public class DrillRoot {
public Collection<String> getMismatchedVersions() {
return Sets.newTreeSet(mismatchedVersions);
}
+
+ public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
+
+ public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
}
public static class DrillbitInfo implements Comparable<DrillbitInfo> {
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 9fa3c27..07c54ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.service;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.channel.EventLoopGroup;
@@ -29,12 +30,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.exec.rpc.control.Controller;
@@ -44,8 +43,6 @@ import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.WorkManager;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Stopwatch;
public class ServiceEngine implements AutoCloseable {
@@ -84,53 +81,8 @@ public class ServiceEngine implements AutoCloseable {
intialUserPort = context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT);
this.allowPortHunting = allowPortHunting;
this.isDistributedMode = isDistributedMode;
-
- registerMetrics(context.getMetrics());
}
- private void registerMetrics(final MetricRegistry registry) {
- final String prefix = PooledByteBufAllocatorL.METRIC_PREFIX + "rpc.";
- DrillMetrics.register(prefix + "user.used", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return userAllocator.getAllocatedMemory();
- }
- });
- DrillMetrics.register(prefix + "user.peak", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return userAllocator.getPeakMemoryAllocation();
- }
- });
- DrillMetrics.register(prefix + "bit.control.used", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return controlAllocator.getAllocatedMemory();
- }
- });
- DrillMetrics.register(prefix + "bit.control.peak", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return controlAllocator.getPeakMemoryAllocation();
- }
- });
-
- DrillMetrics.register(prefix + "bit.data.used", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return dataAllocator.getAllocatedMemory();
- }
- });
- DrillMetrics.register(prefix + "bit.data.peak", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return dataAllocator.getPeakMemoryAllocation();
- }
- });
-
- }
-
-
private static BufferAllocator newAllocator(
BootStrapContext context, String name, String initReservation, String maxAllocation) {
return context.getAllocator().newChildAllocator(
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 3d66d19..19e1b1f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -152,6 +152,14 @@ drill.exec: {
enabled: false
use_login_principal: false
}
+ security.user.encryption.sasl {
+ enabled : false,
+ max_wrapped_size : 65536
+ }
+ security.bit.encryption.sasl {
+ enabled : false,
+ max_wrapped_size : 65536
+ }
trace: {
directory: "/tmp/drill-trace",
filesystem: "file:///"
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/main/resources/rest/index.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl
index 3175479..d1aa844 100644
--- a/exec/java-exec/src/main/resources/rest/index.ftl
+++ b/exec/java-exec/src/main/resources/rest/index.ftl
@@ -69,6 +69,27 @@
</tbody>
</table>
</div>
+ </div>
+ </div>
+
+ <div class="row">
+ <div class="col-md-12">
+ <h3>Encryption Info <span class="label label-primary"></span></h3>
+ <div class="table-responsive">
+ <table class="table table-hover">
+ <tbody>
+ <tr>
+ <td>Client to Bit Encryption:</td>
+ <td>${model.isUserEncryptionEnabled()?string("enabled", "disabled")}</td>
+ </tr>
+ <tr>
+ <td>Bit to Bit Encryption:</td>
+ <td>${model.isBitEncryptionEnabled()?string("enabled", "disabled")}</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ </div>
</div>
</#macro>
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index c3cc2da..a3ea198 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -48,88 +49,45 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.hadoop.security.UgiTestUtil;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.apache.kerby.kerberos.kerb.KrbException;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertTrue;
@Ignore("See DRILL-5387")
public class TestBitBitKerberos extends BaseTestQuery {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitBitKerberos.class);
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitBitKerberos.class);
+ private static KerberosHelper krbHelper;
private static DrillConfig newConfig;
- private static File workspace;
+ private static BootStrapContext c1;
+ private static FragmentManager manager;
+ private int port = 1234;
- private static File kdcDir;
- private static SimpleKdcServer kdc;
- private static int kdcPort;
-
- private static final String HOSTNAME = "localhost";
- private static final String REALM = "EXAMPLE.COM";
-
- private static final String SERVER_SHORT_NAME = System.getProperty("user.name");
- private static final String SERVER_PRINCIPAL = SERVER_SHORT_NAME + "/" + HOSTNAME + "@" + REALM;
-
- private static File keytabDir;
- private static File serverKeytab;
-
- private static boolean kdcStarted;
-
- @SuppressWarnings("restriction")
@BeforeClass
- public static void setupKdc() throws Exception {
- kdc = new SimpleKdcServer();
- workspace = new File(getTempDir("kerberos_target"));
-
- kdcDir = new File(workspace, TestBitBitKerberos.class.getSimpleName());
- kdcDir.mkdirs();
- kdc.setWorkDir(kdcDir);
-
- kdc.setKdcHost(HOSTNAME);
- kdcPort = getFreePort();
- kdc.setAllowTcp(true);
- kdc.setAllowUdp(false);
- kdc.setKdcTcpPort(kdcPort);
-
- logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
-
- kdc.init();
- kdc.start();
- kdcStarted = true;
+ public static void setupTest() throws Exception {
final Config config = DrillConfig.create(cloneDefaultTestConfigProperties());
- keytabDir = new File(workspace, TestBitBitKerberos.class.getSimpleName() + "_keytabs");
- keytabDir.mkdirs();
- setupUsers(keytabDir);
-
- // Kerby sets "java.security.krb5.conf" for us!
- System.clearProperty("java.security.auth.login.config");
- System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
- // Uncomment the following lines for debugging.
- // System.setProperty("sun.security.spnego.debug", "true");
- // System.setProperty("sun.security.krb5.debug", "true");
+ krbHelper = new KerberosHelper(TestBitBitKerberos.class.getSimpleName());
+ krbHelper.setupKdc();
newConfig = new DrillConfig(
config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
@@ -141,9 +99,9 @@ public class TestBitBitKerberos extends BaseTestQuery {
.withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
ConfigValueFactory.fromAnyRef(true))
.withValue(BootStrapContext.SERVICE_PRINCIPAL,
- ConfigValueFactory.fromAnyRef(SERVER_PRINCIPAL))
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
.withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
- ConfigValueFactory.fromAnyRef(serverKeytab.toString())),
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
false);
// Ignore the compile time warning caused by the code below.
@@ -159,65 +117,13 @@ public class TestBitBitKerberos extends BaseTestQuery {
defaultRealm.set(null, KerberosUtil.getDefaultRealm());
updateTestCluster(1, newConfig);
- }
-
- private static int getFreePort() throws IOException {
- @SuppressWarnings("resource")
- ServerSocket s = null;
- try {
- s = new ServerSocket(0);
- s.setReuseAddress(true);
- return s.getLocalPort();
- } finally {
- if (s != null) {
- s.close();
- }
- }
- }
-
- private static void setupUsers(File keytabDir) throws KrbException {
- // Create the server user
- String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
- serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
- logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
- setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
- }
-
- private static void setupUser(SimpleKdcServer kdc, File keytab, String principal)
- throws KrbException {
- kdc.createPrincipal(principal);
- kdc.exportPrincipal(principal, keytab);
- }
-
- @AfterClass
- public static void stopKdc() throws Exception {
- if (kdcStarted) {
- logger.info("Stopping KDC on {}", kdcPort);
- kdc.stop();
- }
-
- deleteIfExists(serverKeytab);
- deleteIfExists(keytabDir);
- deleteIfExists(kdcDir);
- deleteIfExists(workspace);
- UgiTestUtil.resetUgi();
- }
-
- private static void deleteIfExists(File file) throws IOException {
- if (file != null) {
- Files.deleteIfExists(file.toPath());
- }
- }
-
- @Test
- public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
ScanResult result = ClassPathScanner.fromPrescan(newConfig);
- @SuppressWarnings("resource")
- final BootStrapContext c1 = new BootStrapContext(newConfig, result);
- @SuppressWarnings({ "unused", "resource" })
- final BootStrapContext c2 = new BootStrapContext(newConfig, result);
+ c1 = new BootStrapContext(newConfig, result);
+ setupFragmentContextAndManager();
+ }
+ private static void setupFragmentContextAndManager() {
final FragmentContext fcontext = new MockUp<FragmentContext>(){
@SuppressWarnings("unused")
BufferAllocator getAllocator(){
@@ -225,7 +131,7 @@ public class TestBitBitKerberos extends BaseTestQuery {
}
}.getMockInstance();
- final FragmentManager manager = new MockUp<FragmentManager>(){
+ manager = new MockUp<FragmentManager>(){
int v = 0;
@Mock
@@ -252,36 +158,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
}
}.getMockInstance();
-
-
- new NonStrictExpectations() {{
- workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
- workBus.getFragmentManager( (FragmentHandle) any); result = manager;
- }};
-
- int port = 1234;
-
- DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
- new DataServerRequestHandler(workBus, bee));
- @SuppressWarnings("resource")
- DataServer server = new DataServer(config);
-
- port = server.bind(port, true);
- DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
- @SuppressWarnings("resource")
- DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
- DataTunnel tunnel = new DataTunnel(connectionManager);
- AtomicLong max = new AtomicLong(0);
- for (int i = 0; i < 40; i++) {
- long t1 = System.currentTimeMillis();
- tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
- 1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
- System.out.println(System.currentTimeMillis() - t1);
- // System.out.println("sent.");
- }
- System.out.println(String.format("Max time: %d", max.get()));
- assertTrue(max.get() > 2700);
- Thread.sleep(5000);
}
private static WritableBatch getRandomBatch(BufferAllocator allocator, int records) {
@@ -289,8 +165,8 @@ public class TestBitBitKerberos extends BaseTestQuery {
for (int i = 0; i < 5; i++) {
@SuppressWarnings("resource")
Float8Vector v = (Float8Vector) TypeHelper.getNewVector(
- MaterializedField.create("a", Types.required(MinorType.FLOAT8)),
- allocator);
+ MaterializedField.create("a", Types.required(MinorType.FLOAT8)),
+ allocator);
v.allocateNew(records);
v.getMutator().generateTestData(records);
vectors.add(v);
@@ -302,7 +178,7 @@ public class TestBitBitKerberos extends BaseTestQuery {
private AtomicLong max;
private Stopwatch watch = Stopwatch.createStarted();
- public TimingOutcome(AtomicLong max) {
+ TimingOutcome(AtomicLong max) {
super();
this.max = max;
}
@@ -315,7 +191,8 @@ public class TestBitBitKerberos extends BaseTestQuery {
@Override
public void success(Ack value, ByteBuf buffer) {
long micros = watch.elapsed(TimeUnit.MILLISECONDS);
- System.out.println(String.format("Total time to send: %d, start time %d", micros, System.currentTimeMillis() - micros));
+ System.out.println(String.format("Total time to send: %d, start time %d", micros,
+ System.currentTimeMillis() - micros));
while (true) {
long nowMax = max.get();
if (nowMax < micros) {
@@ -333,4 +210,161 @@ public class TestBitBitKerberos extends BaseTestQuery {
// TODO(We don't have any interrupts in test code)
}
}
+
+ @Test
+ public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
+
+ new NonStrictExpectations() {{
+ workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+ workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+ }};
+
+ DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+ new DataServerRequestHandler(workBus, bee));
+ DataServer server = new DataServer(config);
+
+ port = server.bind(port, true);
+ DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+ DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+ DataTunnel tunnel = new DataTunnel(connectionManager);
+ AtomicLong max = new AtomicLong(0);
+ for (int i = 0; i < 40; i++) {
+ long t1 = System.currentTimeMillis();
+ tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+ 1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+ System.out.println(System.currentTimeMillis() - t1);
+ // System.out.println("sent.");
+ }
+ System.out.println(String.format("Max time: %d", max.get()));
+ assertTrue(max.get() > 2700);
+ Thread.sleep(5000);
+ }
+
+ @Test
+ public void successEncryption(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
+
+ newConfig = new DrillConfig(
+ config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("kerberos")))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+ ConfigValueFactory.fromAnyRef("kerberos"))
+ .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+ false);
+
+ updateTestCluster(1, newConfig);
+
+ new NonStrictExpectations() {{
+ workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+ workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+ }};
+
+ DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+ new DataServerRequestHandler(workBus, bee));
+ DataServer server = new DataServer(config);
+
+ port = server.bind(port, true);
+ DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+ DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+ DataTunnel tunnel = new DataTunnel(connectionManager);
+ AtomicLong max = new AtomicLong(0);
+ for (int i = 0; i < 40; i++) {
+ long t1 = System.currentTimeMillis();
+ tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+ 1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+ System.out.println(System.currentTimeMillis() - t1);
+ }
+ System.out.println(String.format("Max time: %d", max.get()));
+ assertTrue(max.get() > 2700);
+ Thread.sleep(5000);
+ }
+
+ @Test
+ public void successEncryptionChunkMode(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus)
+ throws Exception {
+ newConfig = new DrillConfig(
+ config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("kerberos")))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+ ConfigValueFactory.fromAnyRef("kerberos"))
+ .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE,
+ ConfigValueFactory.fromAnyRef(100000))
+ .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+ false);
+
+ updateTestCluster(1, newConfig);
+
+ new NonStrictExpectations() {{
+ workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
+ workBus.getFragmentManager( (FragmentHandle) any); result = manager;
+ }};
+
+ DataConnectionConfig config = new DataConnectionConfig(c1.getAllocator(), c1,
+ new DataServerRequestHandler(workBus, bee));
+ DataServer server = new DataServer(config);
+
+ port = server.bind(port, true);
+ DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
+ DataConnectionManager connectionManager = new DataConnectionManager(ep, config);
+ DataTunnel tunnel = new DataTunnel(connectionManager);
+ AtomicLong max = new AtomicLong(0);
+ for (int i = 0; i < 40; i++) {
+ long t1 = System.currentTimeMillis();
+ tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1,
+ 1, 1, 1, getRandomBatch(c1.getAllocator(), 5000)));
+ System.out.println(System.currentTimeMillis() - t1);
+ }
+ System.out.println(String.format("Max time: %d", max.get()));
+ assertTrue(max.get() > 2700);
+ Thread.sleep(5000);
+ }
+
+ @Test
+ public void failureEncryptionOnlyPlainMechanism() throws Exception {
+ try{
+ newConfig = new DrillConfig(
+ config.withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("plain")))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+ ConfigValueFactory.fromAnyRef("kerberos"))
+ .withValue(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())),
+ false);
+
+ updateTestCluster(1, newConfig);
+ fail();
+ } catch(Exception ex) {
+ assertTrue(ex.getCause() instanceof DrillbitStartupException);
+ }
+ }
+
+ @AfterClass
+ public static void cleanTest() throws Exception {
+ krbHelper.stopKdc();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
index b3c15bd..bdc3230 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
@@ -17,19 +17,13 @@
*/
package org.apache.drill.exec.rpc.data;
-import static org.junit.Assert.assertTrue;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.NonStrictExpectations;
-
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -57,8 +51,12 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.junit.Test;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertTrue;
public class TestBitRpc extends ExecTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
new file mode 100644
index 0000000..3320cef
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/security/KerberosHelper.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.security;
+
+
+import org.apache.kerby.kerberos.kerb.KrbException;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+
+import static org.apache.drill.exec.ExecTest.getTempDir;
+
+public class KerberosHelper {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KerberosHelper.class);
+
+ public File workspace;
+
+ private File kdcDir;
+ private SimpleKdcServer kdc;
+ private int kdcPort;
+
+ private final String HOSTNAME = "localhost";
+
+ public final String CLIENT_SHORT_NAME = "testUser";
+ public final String CLIENT_PRINCIPAL;
+
+ public String SERVER_PRINCIPAL;
+ private final String testName;
+
+ private File keytabDir;
+ public File clientKeytab;
+ public File serverKeytab;
+
+ private boolean kdcStarted;
+
+ public KerberosHelper(final String testName) {
+ final String realm = "EXAMPLE.COM";
+ CLIENT_PRINCIPAL = CLIENT_SHORT_NAME + "@" + realm;
+ final String serverShortName = System.getProperty("user.name");
+ SERVER_PRINCIPAL = serverShortName + "/" + HOSTNAME + "@" + realm;
+ this.testName = testName;
+ }
+
+ public void setupKdc() throws Exception {
+ kdc = new SimpleKdcServer();
+ workspace = new File(getTempDir("kerberos_target"));
+
+ kdcDir = new File(workspace, testName);
+ if(!kdcDir.mkdirs()) {
+ throw new Exception(String.format("Failed to create the kdc directory %s", kdcDir.getName()));
+ }
+ kdc.setWorkDir(kdcDir);
+
+ kdc.setKdcHost(HOSTNAME);
+ kdcPort = getFreePort();
+ kdc.setAllowTcp(true);
+ kdc.setAllowUdp(false);
+ kdc.setKdcTcpPort(kdcPort);
+
+ logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
+
+ kdc.init();
+ kdc.start();
+ kdcStarted = true;
+
+
+ keytabDir = new File(workspace, testName + "_keytabs");
+ if(!keytabDir.mkdirs()) {
+ throw new Exception(String.format("Failed to create the keytab directory %s", keytabDir.getName()));
+ }
+ setupUsers(keytabDir);
+
+ // Kerby sets "java.security.krb5.conf" for us!
+ System.clearProperty("java.security.auth.login.config");
+ System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
+ // Uncomment the following lines for debugging.
+ // System.setProperty("sun.security.spnego.debug", "true");
+ // System.setProperty("sun.security.krb5.debug", "true");
+ }
+
+ private int getFreePort() throws IOException {
+ ServerSocket s = null;
+ try {
+ s = new ServerSocket(0);
+ s.setReuseAddress(true);
+ return s.getLocalPort();
+ } finally {
+ if (s != null) {
+ s.close();
+ }
+ }
+ }
+
+ private void setupUsers(File keytabDir) throws KrbException {
+ // Create the client user
+ String clientPrincipal = CLIENT_PRINCIPAL.substring(0, CLIENT_PRINCIPAL.indexOf('@'));
+ clientKeytab = new File(keytabDir, clientPrincipal.replace('/', '_') + ".keytab");
+ logger.debug("Creating {} with keytab {}", clientPrincipal, clientKeytab);
+ setupUser(kdc, clientKeytab, clientPrincipal);
+
+ // Create the server user
+ String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
+ serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
+ logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
+ setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
+ }
+
+ private void setupUser(SimpleKdcServer kdc, File keytab, String principal)
+ throws KrbException {
+ kdc.createPrincipal(principal);
+ kdc.exportPrincipal(principal, keytab);
+ }
+
+ public void stopKdc() throws Exception {
+ if (kdcStarted) {
+ logger.info("Stopping KDC on {}", kdcPort);
+ kdc.stop();
+ }
+
+ deleteIfExists(clientKeytab);
+ deleteIfExists(serverKeytab);
+ deleteIfExists(keytabDir);
+ deleteIfExists(kdcDir);
+ deleteIfExists(workspace);
+ }
+
+ private void deleteIfExists(File file) throws IOException {
+ if (file != null) {
+ Files.deleteIfExists(file.toPath());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/ce8bbc01/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
index 177268c..3fad005 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberos.java
@@ -23,99 +23,48 @@ import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.rpc.security.KerberosHelper;
import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.hadoop.security.UgiTestUtil;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.apache.kerby.kerberos.kerb.KrbException;
import org.apache.kerby.kerberos.kerb.client.JaasKrbUtil;
-import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import javax.security.auth.Subject;
-import java.io.File;
-import java.io.IOException;
import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;
@Ignore("See DRILL-5387")
public class TestUserBitKerberos extends BaseTestQuery {
- private static final org.slf4j.Logger logger =
- org.slf4j.LoggerFactory.getLogger(TestUserBitKerberos.class);
+ //private static final org.slf4j.Logger logger =org.slf4j.LoggerFactory.getLogger(TestUserBitKerberos.class);
- private static File workspace;
-
- private static File kdcDir;
- private static SimpleKdcServer kdc;
- private static int kdcPort;
-
- private static final String HOSTNAME = "localhost";
- private static final String REALM = "EXAMPLE.COM";
-
- private static final String CLIENT_SHORT_NAME = "testUser";
- private static final String CLIENT_PRINCIPAL = CLIENT_SHORT_NAME + "@" + REALM;
- private static final String SERVER_SHORT_NAME = System.getProperty("user.name");
- private static final String SERVER_PRINCIPAL = SERVER_SHORT_NAME + "/" + HOSTNAME + "@" + REALM;
-
- private static File keytabDir;
- private static File clientKeytab;
- private static File serverKeytab;
-
- private static boolean kdcStarted;
+ private static KerberosHelper krbHelper;
@BeforeClass
- public static void setupKdc() throws Exception {
- kdc = new SimpleKdcServer();
- workspace = new File(getTempDir("kerberos_target"));
-
- kdcDir = new File(workspace, TestUserBitKerberos.class.getSimpleName());
- kdcDir.mkdirs();
- kdc.setWorkDir(kdcDir);
-
- kdc.setKdcHost(HOSTNAME);
- kdcPort = getFreePort();
- kdc.setAllowTcp(true);
- kdc.setAllowUdp(false);
- kdc.setKdcTcpPort(kdcPort);
+ public static void setupTest() throws Exception {
- logger.debug("Starting KDC server at {}:{}", HOSTNAME, kdcPort);
-
- kdc.init();
- kdc.start();
- kdcStarted = true;
-
-
- keytabDir = new File(workspace, TestUserBitKerberos.class.getSimpleName()
- + "_keytabs");
- keytabDir.mkdirs();
- setupUsers(keytabDir);
-
- // Kerby sets "java.security.krb5.conf" for us!
- System.clearProperty("java.security.auth.login.config");
- System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
- // Uncomment the following lines for debugging.
- // System.setProperty("sun.security.spnego.debug", "true");
- // System.setProperty("sun.security.krb5.debug", "true");
+ krbHelper = new KerberosHelper(TestUserBitKerberos.class.getSimpleName());
+ krbHelper.setupKdc();
+ // Create a new DrillConfig which has user authentication enabled and authenticator set to
+ // UserAuthenticatorTestImpl.
final DrillConfig newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
- .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
- ConfigValueFactory.fromAnyRef(true))
- .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
- ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
- .withValue(BootStrapContext.SERVICE_PRINCIPAL,
- ConfigValueFactory.fromAnyRef(SERVER_PRINCIPAL))
- .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
- ConfigValueFactory.fromAnyRef(serverKeytab.toString()))
- .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
- ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))),
- false);
+ .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+ ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+ .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+ ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+ .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+ ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+ .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+ ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))),
+ false);
final Properties connectionProps = new Properties();
connectionProps.setProperty(DrillProperties.USER, "anonymous");
@@ -136,66 +85,12 @@ public class TestUserBitKerberos extends BaseTestQuery {
updateTestCluster(1, newConfig, connectionProps);
}
- private static int getFreePort() throws IOException {
- ServerSocket s = null;
- try {
- s = new ServerSocket(0);
- s.setReuseAddress(true);
- return s.getLocalPort();
- } finally {
- if (s != null) {
- s.close();
- }
- }
- }
-
- private static void setupUsers(File keytabDir) throws KrbException {
- // Create the client user
- String clientPrincipal = CLIENT_PRINCIPAL.substring(0, CLIENT_PRINCIPAL.indexOf('@'));
- clientKeytab = new File(keytabDir, clientPrincipal.replace('/', '_') + ".keytab");
- logger.debug("Creating {} with keytab {}", clientPrincipal, clientKeytab);
- setupUser(kdc, clientKeytab, clientPrincipal);
-
- // Create the server user
- String serverPrincipal = SERVER_PRINCIPAL.substring(0, SERVER_PRINCIPAL.indexOf('@'));
- serverKeytab = new File(keytabDir, serverPrincipal.replace('/', '_') + ".keytab");
- logger.debug("Creating {} with keytab {}", SERVER_PRINCIPAL, serverKeytab);
- setupUser(kdc, serverKeytab, SERVER_PRINCIPAL);
- }
-
- private static void setupUser(SimpleKdcServer kdc, File keytab, String principal)
- throws KrbException {
- kdc.createPrincipal(principal);
- kdc.exportPrincipal(principal, keytab);
- }
-
- @AfterClass
- public static void stopKdc() throws Exception {
- if (kdcStarted) {
- logger.info("Stopping KDC on {}", kdcPort);
- kdc.stop();
- }
-
- deleteIfExists(clientKeytab);
- deleteIfExists(serverKeytab);
- deleteIfExists(keytabDir);
- deleteIfExists(kdcDir);
- deleteIfExists(workspace);
- UgiTestUtil.resetUgi();
- }
-
- private static void deleteIfExists(File file) throws IOException {
- if (file != null) {
- Files.deleteIfExists(file.toPath());
- }
- }
-
@Test
public void successKeytab() throws Exception {
final Properties connectionProps = new Properties();
- connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, SERVER_PRINCIPAL);
- connectionProps.setProperty(DrillProperties.USER, CLIENT_PRINCIPAL);
- connectionProps.setProperty(DrillProperties.KEYTAB, clientKeytab.getAbsolutePath());
+ connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+ connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+ connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
updateClient(connectionProps);
// Run few queries using the new client
@@ -203,7 +98,7 @@ public class TestUserBitKerberos extends BaseTestQuery {
.sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
.unOrdered()
.baselineColumns("session_user")
- .baselineValues(CLIENT_SHORT_NAME)
+ .baselineValues(krbHelper.CLIENT_SHORT_NAME)
.go();
test("SHOW SCHEMAS");
test("USE INFORMATION_SCHEMA");
@@ -215,9 +110,10 @@ public class TestUserBitKerberos extends BaseTestQuery {
@Test
public void successTicket() throws Exception {
final Properties connectionProps = new Properties();
- connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, SERVER_PRINCIPAL);
+ connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
connectionProps.setProperty(DrillProperties.KERBEROS_FROM_SUBJECT, "true");
- final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(CLIENT_PRINCIPAL, clientKeytab.getAbsoluteFile());
+ final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL,
+ krbHelper.clientKeytab.getAbsoluteFile());
Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() {
@Override
@@ -232,7 +128,7 @@ public class TestUserBitKerberos extends BaseTestQuery {
.sqlQuery("SELECT session_user FROM (SELECT * FROM sys.drillbits LIMIT 1)")
.unOrdered()
.baselineColumns("session_user")
- .baselineValues(CLIENT_SHORT_NAME)
+ .baselineValues(krbHelper.CLIENT_SHORT_NAME)
.go();
test("SHOW SCHEMAS");
test("USE INFORMATION_SCHEMA");
@@ -240,4 +136,9 @@ public class TestUserBitKerberos extends BaseTestQuery {
test("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME LIKE 'COLUMNS'");
test("SELECT * FROM cp.`region.json` LIMIT 5");
}
+
+ @AfterClass
+ public static void cleanTest() throws Exception {
+ krbHelper.stopKdc();
+ }
}