You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/02/25 07:18:16 UTC
[23/29] drill git commit: DRILL-4280: CORE (bit to bit authentication,
control)
DRILL-4280: CORE (bit to bit authentication, control)
+ Support authentication in ControlServer and ControlClient
+ Add AuthenticationCommand as an initial command after handshake
and before the command that initiates a connection
+ Add ControlConnectionConfig to encapsulate configuration
+ ControlMessageHandler now implements RequestHandler
control
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/180dd564
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/180dd564
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/180dd564
Branch: refs/heads/master
Commit: 180dd5648ab604b6396d91ba69a2f777f19bf79c
Parents: b6e59ec
Author: Sudheesh Katkam <su...@apache.org>
Authored: Wed Jan 25 19:03:52 2017 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Feb 24 19:01:43 2017 -0800
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 1 +
.../exec/rpc/AbstractServerConnection.java | 2 +-
.../drill/exec/rpc/BitConnectionConfig.java | 98 +++++++++++
.../rpc/control/ConnectionManagerRegistry.java | 38 ++---
.../drill/exec/rpc/control/ControlClient.java | 165 +++++++++++++++----
.../exec/rpc/control/ControlConnection.java | 80 +++++----
.../rpc/control/ControlConnectionConfig.java | 48 ++++++
.../rpc/control/ControlConnectionManager.java | 43 ++---
.../exec/rpc/control/ControlRpcConfig.java | 4 +-
.../drill/exec/rpc/control/ControlServer.java | 63 +++----
.../drill/exec/rpc/control/ControlTunnel.java | 9 +-
.../drill/exec/rpc/control/Controller.java | 3 +-
.../drill/exec/rpc/control/ControllerImpl.java | 29 ++--
.../rpc/control/DefaultInstanceHandler.java | 3 +
.../org/apache/drill/exec/work/WorkManager.java | 3 +-
.../exec/work/batch/ControlMessageHandler.java | 37 +++--
.../work/fragment/NonRootFragmentManager.java | 2 +-
.../src/main/resources/drill-module.conf | 3 +-
18 files changed, 448 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index b8f0c23..460702a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -114,6 +114,7 @@ public interface ExecConstants {
String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
String BIT_AUTHENTICATION_ENABLED = "drill.exec.security.bit.auth.enabled";
String BIT_AUTHENTICATION_MECHANISM = "drill.exec.security.bit.auth.mechanism";
+ String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
/** Size of JDBC batch queue (in batches) above which throttling begins. */
String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
index 72af064..db87bfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractServerConnection.java
@@ -87,7 +87,7 @@ public abstract class AbstractServerConnection<S extends ServerConnection<S>>
}
@Override
- public void finalizeSession() throws IOException {
+ public void finalizeSaslSession() throws IOException {
final String authorizationID = getSaslServer().getAuthorizationID();
final String remoteShortName = new HadoopKerberosName(authorizationID).getShortName();
final String localShortName = UserGroupInformation.getLoginUser().getShortUserName();
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
new file mode 100644
index 0000000..71e5a86
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitConnectionConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.common.KerberosUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.security.AuthStringUtil;
+import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+// config for bit to bit connection
+public abstract class BitConnectionConfig extends AbstractConnectionConfig {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionConfig.class);
+
+ private final String authMechanismToUse;
+ private final boolean useLoginPrincipal;
+
+ protected BitConnectionConfig(BufferAllocator allocator, BootStrapContext context) throws DrillbitStartupException {
+ super(allocator, context);
+
+ final DrillConfig config = context.getConfig();
+ if (config.getBoolean(ExecConstants.BIT_AUTHENTICATION_ENABLED)) {
+ this.authMechanismToUse = config.getString(ExecConstants.BIT_AUTHENTICATION_MECHANISM);
+ try {
+ getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+ } catch (final SaslException e) {
+ throw new DrillbitStartupException(String.format(
+ "'%s' mechanism not found for bit-to-bit authentication. Please check authentication configuration.",
+ authMechanismToUse));
+ }
+ logger.info("Configured bit-to-bit connections to require authentication using: {}", authMechanismToUse);
+ } else {
+ this.authMechanismToUse = null;
+ }
+ this.useLoginPrincipal = config.getBoolean(ExecConstants.USE_LOGIN_PRINCIPAL);
+ }
+
+ // returns null iff auth is disabled
+ public String getAuthMechanismToUse() {
+ return authMechanismToUse;
+ }
+
+ // convenience method
+ public AuthenticatorFactory getAuthFactory(final List<String> remoteMechanisms) throws SaslException {
+ if (authMechanismToUse == null) {
+ throw new SaslException("Authentication is not enabled");
+ }
+ if (!AuthStringUtil.listContains(remoteMechanisms, authMechanismToUse)) {
+ throw new SaslException(String.format("Remote does not support authentication using '%s'", authMechanismToUse));
+ }
+ return getAuthProvider().getAuthenticatorFactory(authMechanismToUse);
+ }
+
+ public Map<String, ?> getSaslClientProperties(final DrillbitEndpoint remoteEndpoint) throws IOException {
+ final DrillProperties properties = DrillProperties.createEmpty();
+
+ final UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS) {
+ final HadoopKerberosName loginPrincipal = new HadoopKerberosName(loginUser.getUserName());
+ if (!useLoginPrincipal) {
+ properties.setProperty(DrillProperties.SERVICE_PRINCIPAL,
+ KerberosUtil.getPrincipalFromParts(loginPrincipal.getShortName(),
+ remoteEndpoint.getAddress(),
+ loginPrincipal.getRealm()));
+ } else {
+ properties.setProperty(DrillProperties.SERVICE_PRINCIPAL, loginPrincipal.toString());
+ }
+ }
+ return properties.stringPropertiesAsMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 1ac30e7..800cf3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,10 +20,7 @@ package org.apache.drill.exec.rpc.control;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
import com.google.common.collect.Maps;
@@ -32,24 +29,21 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
private final ConcurrentMap<DrillbitEndpoint, ControlConnectionManager> registry = Maps.newConcurrentMap();
- private final ControlMessageHandler handler;
- private final BootStrapContext context;
- private volatile DrillbitEndpoint localEndpoint;
- private final BufferAllocator allocator;
+ private final ControlConnectionConfig config;
- public ConnectionManagerRegistry(BufferAllocator allocator, ControlMessageHandler handler, BootStrapContext context) {
- super();
- this.handler = handler;
- this.context = context;
- this.allocator = allocator;
+ private DrillbitEndpoint localEndpoint;
+
+ public ConnectionManagerRegistry(ControlConnectionConfig config) {
+ this.config = config;
}
- public ControlConnectionManager getConnectionManager(DrillbitEndpoint endpoint) {
- assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
- ControlConnectionManager m = registry.get(endpoint);
+ public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) {
+ assert localEndpoint != null :
+ "DrillbitEndpoint must be set before a connection manager can be retrieved";
+ ControlConnectionManager m = registry.get(remoteEndpoint);
if (m == null) {
- m = new ControlConnectionManager(allocator, endpoint, localEndpoint, handler, context);
- ControlConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+ m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint);
+ final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m);
if (m2 != null) {
m = m2;
}
@@ -58,13 +52,13 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
return m;
}
+ void setLocalEndpoint(final DrillbitEndpoint endpoint) {
+ this.localEndpoint = endpoint;
+ }
+
@Override
public Iterator<ControlConnectionManager> iterator() {
return registry.values().iterator();
}
- public void setEndpoint(DrillbitEndpoint endpoint) {
- this.localEndpoint = endpoint;
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index c5bf6b5..6ebe1c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.control;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
@@ -27,54 +28,54 @@ import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcCommand;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.FailingRequestHandler;
import com.google.protobuf.MessageLite;
+import org.apache.hadoop.security.UserGroupInformation;
-public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
+public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);
- private final ControlMessageHandler handler;
private final DrillbitEndpoint remoteEndpoint;
private volatile ControlConnection connection;
private final ControlConnectionManager.CloseHandlerCreator closeHandlerFactory;
- private final DrillbitEndpoint localIdentity;
- private final BufferAllocator allocator;
-
- public ControlClient(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint,
- ControlMessageHandler handler,
- BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
- super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- allocator.getAsByteBufAllocator(),
- context.getBitLoopGroup(),
+ private final ControlConnectionConfig config;
+
+ public ControlClient(ControlConnectionConfig config, DrillbitEndpoint remoteEndpoint,
+ ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
+ super(ControlRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+ config.getBootstrapContext().getExecutor()),
+ config.getAllocator().getAsByteBufAllocator(),
+ config.getBootstrapContext().getBitLoopGroup(),
RpcType.HANDSHAKE,
BitControlHandshake.class,
BitControlHandshake.PARSER);
- this.localIdentity = localEndpoint;
+ this.config = config;
this.remoteEndpoint = remoteEndpoint;
- this.handler = handler;
this.closeHandlerFactory = closeHandlerFactory;
- this.allocator = context.getAllocator();
- }
-
- public void connect(RpcConnectionHandler<ControlConnection> connectionHandler) {
- connectAsClient(connectionHandler, BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort());
}
@SuppressWarnings("unchecked")
@Override
- public ControlConnection initRemoteConnection(SocketChannel channel) {
+ protected ControlConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- this.connection = new ControlConnection("control client", channel,
- (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
+ connection = new ControlConnection(channel, "control client", config,
+ config.getAuthMechanismToUse() == null
+ ? config.getMessageHandler()
+ : new FailingRequestHandler<ControlConnection>(),
+ this);
return connection;
}
@@ -89,14 +90,36 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
}
@Override
- protected Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handler.handle(connection, rpcType, pBody, dBody);
+ protected void handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
+ connection.getCurrentHandler().handle(connection, rpcType, pBody, dBody, sender);
}
@Override
protected void validateHandshake(BitControlHandshake handshake) throws RpcException {
if (handshake.getRpcVersion() != ControlRpcConfig.RPC_VERSION) {
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
+ handshake.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+ }
+
+ if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
+ final SaslClient saslClient;
+ try {
+ saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
+ .createSaslClient(UserGroupInformation.getLoginUser(),
+ config.getSaslClientProperties(remoteEndpoint));
+ } catch (final IOException e) {
+ throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
+ }
+ if (saslClient == null) {
+ throw new RpcException("Unexpected failure. Could not initiate SASL exchange.");
+ }
+ connection.setSaslClient(saslClient);
+ } else {
+ if (config.getAuthMechanismToUse() != null) { // local requires authentication
+ throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
+ remoteEndpoint.getAddress()));
+ }
}
}
@@ -105,8 +128,84 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
connection.setEndpoint(handshake.getEndpoint());
}
- public ControlConnection getConnection() {
- return this.connection;
+ @Override
+ protected <M extends MessageLite> RpcCommand<M, ControlConnection>
+ getInitialCommand(final RpcCommand<M, ControlConnection> command) {
+ final RpcCommand<M, ControlConnection> initialCommand = super.getInitialCommand(command);
+ if (config.getAuthMechanismToUse() == null) {
+ return initialCommand;
+ } else {
+ return new AuthenticationCommand<>(initialCommand);
+ }
+ }
+
+ private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, ControlConnection> {
+
+ private final RpcCommand<M, ControlConnection> command;
+
+ AuthenticationCommand(RpcCommand<M, ControlConnection> command) {
+ this.command = command;
+ }
+
+ @Override
+ public void connectionAvailable(ControlConnection connection) {
+ command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here."));
+ }
+
+ @Override
+ public void connectionSucceeded(final ControlConnection connection) {
+ final UserGroupInformation loginUser;
+ try {
+ loginUser = UserGroupInformation.getLoginUser();
+ } catch (final IOException e) {
+ logger.debug("Unexpected failure trying to login.", e);
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ return;
+ }
+
+ final SettableFuture<Void> future = SettableFuture.create();
+ new AuthenticationOutcomeListener<>(ControlClient.this, connection, RpcType.SASL_MESSAGE,
+ loginUser,
+ new RpcOutcomeListener<Void>() {
+ @Override
+ public void failed(RpcException ex) {
+ logger.debug("Authentication failed.", ex);
+ future.setException(ex);
+ }
+
+ @Override
+ public void success(Void value, ByteBuf buffer) {
+ connection.changeHandlerTo(config.getMessageHandler());
+ future.set(null);
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ logger.debug("Authentication failed.", e);
+ future.setException(e);
+ }
+ }).initiate(config.getAuthMechanismToUse());
+
+
+ try {
+ logger.trace("Waiting until authentication completes..");
+ future.get();
+ command.connectionSucceeded(connection);
+ } catch (InterruptedException e) {
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ }
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ logger.debug("Authentication failed.", t);
+ command.connectionFailed(FailureType.AUTHENTICATION, t);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
index 179a2f4..a50a3b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnection.java
@@ -17,35 +17,40 @@
*/
package org.apache.drill.exec.rpc.control;
+import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.SocketChannel;
-
-import java.util.UUID;
-
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.AbstractServerConnection;
+import org.apache.drill.exec.rpc.ClientConnection;
+import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.slf4j.Logger;
-import com.google.protobuf.MessageLite;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkState;
-public class ControlConnection extends RemoteConnection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnection.class);
+public class ControlConnection extends AbstractServerConnection<ControlConnection> implements ClientConnection {
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnection.class);
private final RpcBus<RpcType, ControlConnection> bus;
- private final BufferAllocator allocator;
+ private final UUID id;
+
private volatile DrillbitEndpoint endpoint;
private volatile boolean active = false;
- private final UUID id;
- public ControlConnection(String name, SocketChannel channel, RpcBus<RpcType, ControlConnection> bus,
- BufferAllocator allocator) {
- super(channel, name);
+ private SaslClient saslClient;
+
+ ControlConnection(SocketChannel channel, String name, ControlConnectionConfig config,
+ RequestHandler<ControlConnection> handler, RpcBus<RpcType, ControlConnection> bus) {
+ super(channel, name, config, handler);
this.bus = bus;
this.id = UUID.randomUUID();
- this.allocator = allocator;
}
void setEndpoint(DrillbitEndpoint endpoint) {
@@ -54,24 +59,18 @@ public class ControlConnection extends RemoteConnection {
active = true;
}
- protected DrillbitEndpoint getEndpoint() {
- return endpoint;
- }
-
- public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener,
- RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+ public <SEND extends MessageLite, RECEIVE extends MessageLite>
+ void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody,
+ Class<RECEIVE> clazz, ByteBuf... dataBodies) {
bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
}
- public <SEND extends MessageLite, RECEIVE extends MessageLite> void sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener,
- RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+ public <SEND extends MessageLite, RECEIVE extends MessageLite>
+ void sendUnsafe(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody,
+ Class<RECEIVE> clazz, ByteBuf... dataBodies) {
bus.send(outcomeListener, this, rpcType, protobufBody, clazz, true, dataBodies);
}
- public void disable() {
- active = false;
- }
-
@Override
public boolean isActive() {
return active;
@@ -108,8 +107,33 @@ public class ControlConnection extends RemoteConnection {
}
@Override
- public BufferAllocator getAllocator() {
- return allocator;
+ protected Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public void setSaslClient(final SaslClient saslClient) {
+ checkState(this.saslClient == null);
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public SaslClient getSaslClient() {
+ checkState(saslClient != null);
+ return saslClient;
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (saslClient != null) {
+ saslClient.dispose();
+ saslClient = null;
+ }
+ } catch (final SaslException e) {
+ getLogger().warn("Unclean disposal", e);
+ }
+ super.close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
new file mode 100644
index 0000000..b19fb8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.BitConnectionConfig;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+
+// config for bit to bit connection
+// package private
+class ControlConnectionConfig extends BitConnectionConfig {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class);
+
+ private final ControlMessageHandler handler;
+
+ ControlConnectionConfig(BufferAllocator allocator, BootStrapContext context, ControlMessageHandler handler)
+ throws DrillbitStartupException {
+ super(allocator, context);
+ this.handler = handler;
+ }
+
+ @Override
+ public String getName() {
+ return "control"; // unused
+ }
+
+ ControlMessageHandler getMessageHandler() {
+ return handler;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 611b727..b31ffa7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -17,13 +17,10 @@
*/
package org.apache.drill.exec.rpc.control;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.ReconnectingConnection;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
/**
* Maintains connection between two particular bits.
@@ -31,34 +28,26 @@ import org.apache.drill.exec.work.batch.ControlMessageHandler;
public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
- private final DrillbitEndpoint endpoint;
- private final ControlMessageHandler handler;
- private final BootStrapContext context;
- private final DrillbitEndpoint localIdentity;
- private final BufferAllocator allocator;
-
- public ControlConnectionManager(BufferAllocator allocator, DrillbitEndpoint remoteEndpoint,
- DrillbitEndpoint localIdentity, ControlMessageHandler handler, BootStrapContext context) {
- super(BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort());
- assert remoteEndpoint != null : "Endpoint cannot be null.";
- assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
- assert remoteEndpoint.getControlPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k. Was set to %d.", remoteEndpoint.getControlPort());
-
- this.allocator = allocator;
- this.endpoint = remoteEndpoint;
- this.localIdentity = localIdentity;
- this.handler = handler;
- this.context = context;
+ private final ControlConnectionConfig config;
+ private final DrillbitEndpoint remoteEndpoint;
+
+ public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint,
+ DrillbitEndpoint remoteEndpoint) {
+ super(
+ BitControlHandshake.newBuilder()
+ .setRpcVersion(ControlRpcConfig.RPC_VERSION)
+ .setEndpoint(localEndpoint)
+ .build(),
+ remoteEndpoint.getAddress(),
+ remoteEndpoint.getControlPort());
+
+ this.config = config;
+ this.remoteEndpoint = remoteEndpoint;
}
@Override
protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() {
- return new ControlClient(allocator, endpoint, localIdentity, handler, context, new CloseHandlerCreator());
- }
-
-
- public DrillbitEndpoint getEndpoint() {
- return endpoint;
+ return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index ec09a98..562cd3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcConfig;
@@ -52,10 +53,11 @@ public class ControlRpcConfig {
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
.add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CUSTOM, CustomMessage.class, RpcType.RESP_CUSTOM, CustomMessage.class)
+ .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, SaslMessage.class)
.build();
}
- public static int RPC_VERSION = 3;
+ public static final int RPC_VERSION = 3;
public static final Response OK = new Response(RpcType.ACK, Acks.OK);
public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index a786469..9e733df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.rpc.control;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
@@ -28,29 +27,25 @@ import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.batch.ControlMessageHandler;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
import com.google.protobuf.MessageLite;
public class ControlServer extends BasicServer<RpcType, ControlConnection>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlServer.class);
- private final ControlMessageHandler handler;
+ private final ControlConnectionConfig config;
private final ConnectionManagerRegistry connectionRegistry;
private volatile ProxyCloseHandler proxyCloseHandler;
- private BufferAllocator allocator;
-
- public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
- super(
- ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getAsByteBufAllocator(),
- context.getBitLoopGroup());
- this.handler = handler;
+
+ public ControlServer(ControlConnectionConfig config, ConnectionManagerRegistry connectionRegistry) {
+ super(ControlRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+ config.getBootstrapContext().getExecutor()),
+ config.getAllocator().getAsByteBufAllocator(),
+ config.getBootstrapContext().getBitLoopGroup());
+ this.config = config;
this.connectionRegistry = connectionRegistry;
- this.allocator = context.getAllocator();
}
@Override
@@ -59,20 +54,20 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
}
@Override
- protected Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handler.handle(connection, rpcType, pBody, dBody);
- }
-
- @Override
protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
return proxyCloseHandler;
}
@Override
- public ControlConnection initRemoteConnection(SocketChannel channel) {
+ protected ControlConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- return new ControlConnection("control server", channel, this, allocator);
+ return new ControlConnection(channel, "control server", config,
+ config.getAuthMechanismToUse() == null
+ ? config.getMessageHandler()
+ : new ServerAuthenticationHandler<>(config.getMessageHandler(),
+ RpcType.SASL_MESSAGE_VALUE, RpcType.SASL_MESSAGE),
+ this);
}
@@ -84,10 +79,14 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
public MessageLite getHandshakeResponse(BitControlHandshake inbound) throws Exception {
// logger.debug("Handling handshake from other bit. {}", inbound);
if (inbound.getRpcVersion() != ControlRpcConfig.RPC_VERSION) {
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
+ inbound.getRpcVersion(), ControlRpcConfig.RPC_VERSION));
}
- if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) {
- throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint()));
+ if (!inbound.hasEndpoint() ||
+ inbound.getEndpoint().getAddress().isEmpty() ||
+ inbound.getEndpoint().getControlPort() < 1) {
+ throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.",
+ inbound.getEndpoint()));
}
connection.setEndpoint(inbound.getEndpoint());
@@ -95,19 +94,25 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
ControlConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
// update the close handler.
- proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+ proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection,
+ proxyCloseHandler.getHandler()));
// add to the connection manager.
manager.addExternalConnection(connection);
- return BitControlHandshake.newBuilder().setRpcVersion(ControlRpcConfig.RPC_VERSION).build();
+ final BitControlHandshake.Builder builder = BitControlHandshake.newBuilder();
+ builder.setRpcVersion(ControlRpcConfig.RPC_VERSION);
+ if (config.getAuthMechanismToUse() != null) {
+ builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
+ }
+ return builder.build();
}
};
}
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 9b46a7a..bb0fda3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -57,15 +56,9 @@ public class ControlTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
private final ControlConnectionManager manager;
- private final DrillbitEndpoint endpoint;
- public ControlTunnel(DrillbitEndpoint endpoint, ControlConnectionManager manager) {
+ public ControlTunnel(ControlConnectionManager manager) {
this.manager = manager;
- this.endpoint = endpoint;
- }
-
- public DrillbitEndpoint getEndpoint(){
- return manager.getEndpoint();
}
public void sendFragments(RpcOutcomeListener<Ack> outcomeListener, InitializeFragments fragments){
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index a5f470c..6b2ee4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -44,7 +44,8 @@ public interface Controller extends AutoCloseable {
*/
public ControlTunnel getTunnel(DrillbitEndpoint node) ;
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting)
+ throws DrillbitStartupException;
/**
* Register a new handler for custom message types. Should be done before any messages. This is threadsafe as this
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index 482f117..4991816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,39 +36,34 @@ import com.google.protobuf.Parser;
* Manages communication tunnels between nodes.
*/
public class ControllerImpl implements Controller {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControllerImpl.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControllerImpl.class);
private volatile ControlServer server;
- private final ControlMessageHandler handler;
- private final BootStrapContext context;
private final ConnectionManagerRegistry connectionRegistry;
- private final boolean allowPortHunting;
private final CustomHandlerRegistry handlerRegistry;
+ private final ControlConnectionConfig config;
- public ControllerImpl(BootStrapContext context, ControlMessageHandler handler, BufferAllocator allocator,
- boolean allowPortHunting) {
- super();
- this.handler = handler;
- this.context = context;
- this.connectionRegistry = new ConnectionManagerRegistry(allocator, handler, context);
- this.allowPortHunting = allowPortHunting;
+ public ControllerImpl(BootStrapContext context, BufferAllocator allocator, ControlMessageHandler handler)
+ throws DrillbitStartupException {
+ config = new ControlConnectionConfig(allocator, context, handler);
+ this.connectionRegistry = new ConnectionManagerRegistry(config);
this.handlerRegistry = handler.getHandlerRegistry();
}
@Override
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
- server = new ControlServer(handler, context, connectionRegistry);
- int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, final boolean allowPortHunting) {
+ server = new ControlServer(config, connectionRegistry);
+ int port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
port = server.bind(port, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setControlPort(port).build();
- connectionRegistry.setEndpoint(completeEndpoint);
+ connectionRegistry.setLocalEndpoint(completeEndpoint);
handlerRegistry.setEndpoint(completeEndpoint);
return completeEndpoint;
}
@Override
public ControlTunnel getTunnel(DrillbitEndpoint endpoint) {
- return new ControlTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
+ return new ControlTunnel(connectionRegistry.getConnectionManager(endpoint));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
index 7065201..5360cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/DefaultInstanceHandler.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
import org.apache.drill.exec.rpc.RpcException;
import com.google.protobuf.MessageLite;
@@ -49,6 +50,8 @@ public class DefaultInstanceHandler {
return QueryProfile.getDefaultInstance();
case RpcType.RESP_CUSTOM_VALUE:
return CustomMessage.getDefaultInstance();
+ case RpcType.SASL_MESSAGE_VALUE:
+ return SaslMessage.getDefaultInstance();
default:
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 697616e..c352861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -260,7 +260,8 @@ public class WorkManager implements AutoCloseable {
/**
* Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
- * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataServer#handle}
+ * called, when the first batch arrives.
+ *
* @param fragmentManager the manager for the fragment
*/
public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 77c069b..58c1df5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,7 +34,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
@@ -50,7 +52,7 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-public class ControlMessageHandler {
+public class ControlMessageHandler implements RequestHandler<ControlConnection> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
private final WorkerBee bee;
private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry();
@@ -59,8 +61,9 @@ public class ControlMessageHandler {
this.bee = bee;
}
- public Response handle(final ControlConnection connection, final int rpcType,
- final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+ @Override
+ public void handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received bit com message of type {}", rpcType);
}
@@ -70,34 +73,39 @@ public class ControlMessageHandler {
case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
cancelFragment(handle);
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
+ break;
}
case RpcType.REQ_CUSTOM_VALUE: {
final CustomMessage customMessage = get(pBody, CustomMessage.PARSER);
- return handlerRegistry.handle(customMessage, (DrillBuf) dBody);
+ sender.send(handlerRegistry.handle(customMessage, (DrillBuf) dBody));
+ break;
}
case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
receivingFragmentFinished(finishedReceiver);
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
+ break;
}
case RpcType.REQ_FRAGMENT_STATUS_VALUE:
bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
// TODO: Support a type of message that has no response.
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
+ break;
case RpcType.REQ_QUERY_CANCEL_VALUE: {
final QueryId queryId = get(pBody, QueryId.PARSER);
final Foreman foreman = bee.getForemanForQueryId(queryId);
if (foreman != null) {
foreman.cancel();
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
} else {
- return ControlRpcConfig.FAIL;
+ sender.send(ControlRpcConfig.FAIL);
}
+ break;
}
case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
@@ -105,7 +113,8 @@ public class ControlMessageHandler {
for(int i = 0; i < fragments.getFragmentCount(); i++) {
startNewRemoteFragment(fragments.getFragment(i));
}
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
+ break;
}
case RpcType.REQ_QUERY_STATUS_VALUE: {
@@ -115,13 +124,15 @@ public class ControlMessageHandler {
throw new RpcException("Query not running on node.");
}
final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
- return new Response(RpcType.RESP_QUERY_STATUS, profile);
+ sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile));
+ break;
}
case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
resumeFragment(handle);
- return ControlRpcConfig.OK;
+ sender.send(ControlRpcConfig.OK);
+ break;
}
default:
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 3e7a693..7cffa0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -62,7 +62,7 @@ public class NonRootFragmentManager implements FragmentManager {
}
/* (non-Javadoc)
- * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
+ * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.AbstractRemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
*/
@Override
public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
http://git-wip-us.apache.org/repos/asf/drill/blob/180dd564/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index ecf6f6a..9c2ba2f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -149,7 +149,8 @@ drill.exec: {
enabled: false
},
security.bit.auth {
- enabled : false
+ enabled: false
+ use_login_principal: false
}
trace: {
directory: "/tmp/drill-trace",