You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:42 UTC
[21/53] [abbrv] Clean up threading of client/server. Utilize command
pattern for BitCom stuff to abstract away connection failures. Works on one
bit single exchange remote query now. Next up, two bit single exchange query.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index 4ba99a1..82a6aa6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -22,57 +22,54 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
import com.google.protobuf.MessageLite;
-public class BitClient extends BasicClient<RpcType, BitConnection>{
+public class BitClient extends BasicClient<RpcType, BitConnection, BitHandshake, BitHandshake>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
private final BitComHandler handler;
- private final DrillbitEndpoint endpoint;
- private BitConnection connection;
- private final AvailabilityListener openListener;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+ private final DrillbitEndpoint remoteEndpoint;
+ private volatile BitConnection connection;
private final ListenerPool listeners;
+ private final CloseHandlerCreator closeHandlerFactory;
+ private final DrillbitEndpoint localIdentity;
- public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
- super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
-
- this.endpoint = endpoint;
+ public BitClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, BitComHandler handler, BootStrapContext context, CloseHandlerCreator closeHandlerFactory, ListenerPool listeners) {
+ super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER);
+ this.localIdentity = localEndpoint;
+ this.remoteEndpoint = remoteEndpoint;
this.handler = handler;
- this.openListener = openListener;
- this.registry = registry;
this.listeners = listeners;
+ this.closeHandlerFactory = closeHandlerFactory;
}
- public BitHandshake connect() throws RpcException, InterruptedException{
- BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
- connection.setEndpoint(endpoint);
- return bs;
+ public void connect(RpcConnectionHandler<BitConnection> connectionHandler) {
+ connectAsClient(connectionHandler, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getBitPort());
}
@SuppressWarnings("unchecked")
@Override
public BitConnection initRemoteConnection(Channel channel) {
- this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+ this.connection = new BitConnection(channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, listeners);
return connection;
}
@Override
protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
- return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
}
@Override
@@ -86,18 +83,15 @@ public class BitClient extends BasicClient<RpcType, BitConnection>{
}
@Override
- protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
- return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
-
- @Override
- protected void validateHandshake(BitHandshake inbound) throws Exception {
- logger.debug("Handling handshake from bit server to bit client. {}", inbound);
- if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
- }
+ protected void validateHandshake(BitHandshake handshake) throws RpcException {
+ if(handshake.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ }
- };
+ @Override
+ protected void finalizeConnection(BitHandshake handshake, BitConnection connection) {
+ connection.setEndpoint(handshake.getEndpoint());
}
-
+
public BitConnection getConnection(){
return this.connection;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index c60d36b..f7f508e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -40,11 +40,17 @@ public interface BitCom extends Closeable {
*/
public BitTunnel getTunnel(DrillbitEndpoint node) ;
- public int start() throws InterruptedException, DrillbitStartupException;
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
/**
* Register an incoming batch handler for a local foreman.
* @param handler
*/
public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
+
+ /**
+ * Get ListenerPool
+ * @return
+ */
+ public ListenerPool getListeners();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index c98be44..d1cadc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -18,157 +18,68 @@
package org.apache.drill.exec.rpc.bit;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
/**
- * Manages communication tunnels between nodes.
+ * Manages communication tunnels between nodes.
*/
public class BitComImpl implements BitCom {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
private final ListenerPool listeners;
private volatile BitServer server;
private final BitComHandler handler;
private final BootStrapContext context;
-
- // TODO: this executor should be removed.
- private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
+ private final ConnectionManagerRegistry connectionRegistry;
public BitComImpl(BootStrapContext context, BitComHandler handler) {
super();
this.handler = handler;
this.context = context;
this.listeners = new ListenerPool(8);
+ this.connectionRegistry = new ConnectionManagerRegistry(handler, context, listeners);
}
- public int start() throws InterruptedException, DrillbitStartupException {
- server = new BitServer(handler, context, registry, listeners);
+ @Override
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+ server = new BitServer(handler, context, connectionRegistry, listeners);
int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
- return server.bind(port);
- }
-
- private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
-
-
- SettableFuture<BitConnection> future = SettableFuture.create();
- BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
- BitConnection t = null;
-
- if (check) {
- t = registry.get(endpoint);
-
- if (t != null) {
- future.set(t);
- return checkedFuture;
- }
- }
-
- try {
- AvailWatcher watcher = new AvailWatcher(future);
- BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
- c.connect();
- return checkedFuture;
- } catch (InterruptedException | RpcException e) {
- future.setException(new FragmentSetupException("Unable to open connection"));
- return checkedFuture;
- }
-
+ port = server.bind(port);
+ DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setBitPort(port).build();
+ connectionRegistry.setEndpoint(completeEndpoint);
+ return completeEndpoint;
}
- private class AvailWatcher implements AvailabilityListener{
- final SettableFuture<BitConnection> future;
-
- public AvailWatcher(SettableFuture<BitConnection> future) {
- super();
- this.future = future;
- }
-
- @Override
- public void isAvailable(BitConnection connection) {
- future.set(connection);
- }
-
- }
- BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
- BitConnection t = registry.get(endpoint);
- if(t != null) return t;
- return this.getNode(endpoint, false).checkedGet();
+
+ public ListenerPool getListeners() {
+ return listeners;
}
-
- CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
- return this.getNode(endpoint, true);
- }
-
-
@Override
- public BitTunnel getTunnel(DrillbitEndpoint endpoint){
- BitConnection t = registry.get(endpoint);
- if(t == null){
- return new BitTunnel(exec, endpoint, this, t);
- }else{
- return new BitTunnel(exec, endpoint, this, this.getNode(endpoint, false));
- }
+ public BitTunnel getTunnel(DrillbitEndpoint endpoint) {
+ return new BitTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
}
-
- /**
- * A future which remaps exceptions to a BitComException.
- * @param <T>
- */
- private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
-
- protected BitComFuture(ListenableFuture<T> delegate) {
- super(delegate);
- }
-
- @Override
- protected RpcException mapException(Exception e) {
- Throwable t = e;
- if(e instanceof ExecutionException){
- t = e.getCause();
- }
-
- if(t instanceof RpcException) return (RpcException) t;
- return new RpcException(t);
- }
+ @Override
+ public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+ this.handler.registerIncomingFragmentHandler(handler);
}
public void close() {
Closeables.closeQuietly(server);
- for (BitConnection bt : registry.values()) {
- bt.shutdownIfClient();
+ for (BitConnectionManager bt : connectionRegistry) {
+ bt.close();
}
}
- @Override
- public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
- this.handler.registerIncomingFragmentHandler(handler);
- }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
new file mode 100644
index 0000000..692c63e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+
+import com.google.protobuf.MessageLite;
+
+public interface BitCommand<T extends MessageLite> extends RpcConnectionHandler<BitConnection>{
+
+ public abstract void connectionAvailable(BitConnection connection);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
index 73980f9..f85ea74 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -17,6 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
@@ -35,31 +36,35 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
public class BitConnection extends RemoteConnection{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class);
private final RpcBus<RpcType, BitConnection> bus;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
private final ListenerPool listeners;
-
- private final AvailabilityListener listener;
private volatile DrillbitEndpoint endpoint;
private volatile boolean active = false;
private final UUID id;
- public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+ public BitConnection(Channel channel, RpcBus<RpcType, BitConnection> bus, ListenerPool listeners){
super(channel);
this.bus = bus;
- this.registry = registry;
// we use a local listener pool unless a global one is provided.
this.listeners = listeners != null ? listeners : new ListenerPool(2);
- this.listener = listener;
this.id = UUID.randomUUID();
}
+
+ void setEndpoint(DrillbitEndpoint endpoint){
+ assert this.endpoint == null : "Endpoint should only be set once (only in the case in incoming server requests).";
+ this.endpoint = endpoint;
+ active = true;
+ }
protected DrillbitEndpoint getEndpoint() {
return endpoint;
@@ -69,48 +74,12 @@ public class BitConnection extends RemoteConnection{
return listeners;
}
- protected void setEndpoint(DrillbitEndpoint endpoint) {
- Preconditions.checkNotNull(endpoint);
- Preconditions.checkArgument(this.endpoint == null);
-
- this.endpoint = endpoint;
- BitServer.logger.debug("Adding new endpoint to available BitServer connections. Endpoint: {}.", endpoint);
- synchronized(this){
- BitConnection c = registry.putIfAbsent(endpoint, this);
-
- if(c != null){ // the registry already has a connection like this
-
- // give the awaiting future an alternative connection.
- if(listener != null){
- listener.isAvailable(c);
- }
-
- // shut this down if this is a client as it won't be available in the registry.
- // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other. This shouldn't cause a problem.
- logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
- shutdownIfClient();
-
- }
- active = true;
- if(listener != null) listener.isAvailable(this);
- }
- }
-
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
- return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
- }
-
- public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
- return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
- }
- public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
- return bus.send(this, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){
+ bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
}
- public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
- return bus.send(this, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
- }
public void disable(){
active = false;
@@ -140,27 +109,7 @@ public class BitConnection extends RemoteConnection{
return true;
}
- public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
- return new CloseHandler(this, parent);
- }
-
- private class CloseHandler implements GenericFutureListener<ChannelFuture>{
- private BitConnection connection;
- private GenericFutureListener<ChannelFuture> parent;
-
- public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
- super();
- this.connection = connection;
- this.parent = parent;
- }
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
- parent.operationComplete(future);
- }
-
- }
public void shutdownIfClient(){
if(bus.isClient()) Closeables.closeQuietly(bus);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
index 0160d24..d99bb22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -17,58 +17,152 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.protobuf.MessageLite;
-public class BitConnectionManager {
+/**
+ * Manager all connections between two particular bits.
+ */
+public class BitConnectionManager implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
- private final int maxAttempts;
- private final BitComImpl com;
private final DrillbitEndpoint endpoint;
- private final AtomicReference<BitConnection> connection;
- private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+ private final AtomicReference<BitConnection> connectionHolder;
+ private final BitComHandler handler;
+ private final BootStrapContext context;
+ private final ListenerPool listenerPool;
+ private final DrillbitEndpoint localIdentity;
+
+ public BitConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+ assert remoteEndpoint != null : "Endpoint cannot be null.";
+ assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
+ assert remoteEndpoint.getBitPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k. Was set to %d.", remoteEndpoint.getBitPort());
+
+ this.connectionHolder = new AtomicReference<BitConnection>();
+ this.endpoint = remoteEndpoint;
+ this.localIdentity = localIdentity;
+ this.handler = handler;
+ this.context = context;
+ this.listenerPool = listenerPool;
+ }
+
+ public <R extends MessageLite> BitCommand<R> runCommand(BitCommand<R> cmd){
+ logger.debug("Running command {}", cmd);
+ BitConnection connection = connectionHolder.get();
+ if(connection != null){
+ if(connection.isActive()){
+ cmd.connectionAvailable(connection);
+ return cmd;
+ }else{
+ // remove the old connection. (don't worry if we fail since someone else should have done it.
+ connectionHolder.compareAndSet(connection, null);
+ }
+ }
+
+ /** We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another endpoint could create a reverse connection **/
+ synchronized(this){
+ connection = connectionHolder.get();
+ if(connection != null){
+ cmd.connectionAvailable(connection);
+ }else{
+ BitClient client = new BitClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator(), listenerPool);
+
+ client.connect(new ConnectionListeningDecorator(cmd, !endpoint.equals(localIdentity)));
+ }
+ return cmd;
+
+ }
+ }
+
+ CloseHandlerCreator getCloseHandlerCreator(){
+ return new CloseHandlerCreator();
+ }
- BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
- assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
- this.com = com;
- this.connection = new AtomicReference<BitConnection>(connection);
- this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
- this.endpoint = endpoint;
- this.maxAttempts = maxAttempts;
+ /** Factory for close handlers **/
+ class CloseHandlerCreator{
+ public GenericFutureListener<ChannelFuture> getHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent){
+ return new CloseHandler(connection, parent);
+ }
}
- BitConnection getConnection(int attempt) throws RpcException{
- BitConnection con = connection.get();
+
+
+ /**
+ * Listens for connection closes and clears connection holder.
+ */
+ private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+ private BitConnection connection;
+ private GenericFutureListener<ChannelFuture> parent;
- if(con != null){
- if(con.isActive()) return con;
- connection.compareAndSet(con, null);
+ public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+ super();
+ this.connection = connection;
+ this.parent = parent;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ connectionHolder.compareAndSet(connection, null);
+ parent.operationComplete(future);
}
- CheckedFuture<BitConnection, RpcException> fut = future.get();
+ }
+
+ /**
+ * Decorate a connection creation so that we capture a success and keep it available for future requests. If we have raced and another is already available... we return that one and close things down on this one.
+ */
+ private class ConnectionListeningDecorator implements RpcConnectionHandler<BitConnection>{
+
+ private final RpcConnectionHandler<BitConnection> delegate;
+ private final boolean closeOnDupe;
+
+ public ConnectionListeningDecorator(RpcConnectionHandler<BitConnection> delegate, boolean closeOnDupe) {
+ this.delegate = delegate;
+ this.closeOnDupe = closeOnDupe;
+ }
- if(fut != null){
- try{
- return fut.checkedGet();
- }catch(RpcException ex){
- future.compareAndSet(fut, null);
- if(attempt < maxAttempts){
- return getConnection(attempt + 1);
- }else{
- throw ex;
+ @Override
+ public void connectionSucceeded(BitConnection incoming) {
+ BitConnection connection = connectionHolder.get();
+ while(true){
+ boolean setted = connectionHolder.compareAndSet(null, incoming);
+ if(setted){
+ connection = incoming;
+ break;
}
+ connection = connectionHolder.get();
+ if(connection != null) break;
+ }
+
+
+ if(connection == incoming){
+ delegate.connectionSucceeded(connection);
+ }else{
+
+ if(closeOnDupe){
+ // close the incoming because another channel was created in the mean time (unless this is a self connection).
+ logger.debug("Closing incoming connection because a connection was already set.");
+ incoming.getChannel().close();
+ }
+ delegate.connectionSucceeded(connection);
}
}
-
- // no checked future, let's make one.
- fut = com.getConnectionAsync(endpoint);
- future.compareAndSet(null, fut);
- return getConnection(attempt);
+
+ @Override
+ public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+ delegate.connectionFailed(type, t);
+ }
}
@@ -76,5 +170,20 @@ public class BitConnectionManager {
return endpoint;
}
+ public void addServerConnection(BitConnection connection){
+ // if the connection holder is not set, set it to this incoming connection.
+ logger.debug("Setting server connection.");
+ this.connectionHolder.compareAndSet(null, connection);
+ }
+
+ @Override
+ public void close() {
+ BitConnection c = connectionHolder.getAndSet(null);
+ if(c != null){
+ c.getChannel().close();
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index 88ac6cc..d4665a8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -22,18 +22,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
@@ -43,13 +38,14 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
private final BitComHandler handler;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
private final ListenerPool listeners;
+ private final ConnectionManagerRegistry connectionRegistry;
+ private volatile ProxyCloseHandler proxyCloseHandler;
- public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+ public BitServer(BitComHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry, ListenerPool listeners) {
super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
this.handler = handler;
- this.registry = registry;
+ this.connectionRegistry = connectionRegistry;
this.listeners = listeners;
}
@@ -65,23 +61,36 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
@Override
protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
- return connection.getCloseHandler(super.getCloseHandler(connection));
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ return proxyCloseHandler;
}
@Override
public BitConnection initRemoteConnection(Channel channel) {
- return new BitConnection(null, channel, this, registry, listeners);
+ return new BitConnection(channel, this, listeners);
}
@Override
- protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+ protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler(final BitConnection connection) {
return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
@Override
public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
- logger.debug("Handling handshake from other bit. {}", inbound);
+// logger.debug("Handling handshake from other bit. {}", inbound);
if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getBitPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint()));
+ connection.setEndpoint(inbound.getEndpoint());
+
+ // add the
+ BitConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
+
+ // update the close handler.
+ proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+
+ // add to the connection manager.
+ manager.addServerConnection(connection);
+
return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
}
@@ -89,5 +98,30 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
}
+ private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
+
+ private volatile GenericFutureListener<ChannelFuture> handler;
+
+ public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
+ super();
+ this.handler = handler;
+ }
+
+
+ public GenericFutureListener<ChannelFuture> getHandler() {
+ return handler;
+ }
+
+
+ public void setHandler(GenericFutureListener<ChannelFuture> handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ handler.operationComplete(future);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 652fa52..83b7959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,95 +17,79 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-/**
- * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
- * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
- * and action. A better approach should be done.
- */
public class BitTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
- private static final int MAX_ATTEMPTS = 3;
-
private final BitConnectionManager manager;
- private final Executor exec;
-
+ private final DrillbitEndpoint endpoint;
- public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
- this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
- this.exec = exec;
- }
-
- public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
- CheckedFuture<BitConnection, RpcException> future) {
- this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
- this.exec = exec;
+ public BitTunnel(DrillbitEndpoint endpoint, BitConnectionManager manager) {
+ this.manager = manager;
+ this.endpoint = endpoint;
}
public DrillbitEndpoint getEndpoint(){
return manager.getEndpoint();
}
- private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
- exec.execute(command);
- return command;
- }
-
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
- return submit(new SendBatch(batch, context));
+ public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentContext context, FragmentWritableBatch batch) {
+ SendBatch b = new SendBatch(outcomeListener, batch, context);
+ manager.runCommand(b);
}
- public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
- return submit(new SendFragment(fragment));
+ public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){
+ SendFragment b = new SendFragment(outcomeListener, fragment);
+ manager.runCommand(b);
}
-
- public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
- return submit(new CancelFragment(handle));
+
+ public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+ CancelFragment b = new CancelFragment(handle);
+ manager.runCommand(b);
+ return b.getFuture();
}
-
+
public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
- return submit(new SendFragmentStatus(status));
+ SendFragmentStatus b = new SendFragmentStatus(status);
+ manager.runCommand(b);
+ return b.getFuture();
}
- public class SendBatch extends BitCommand<Ack> {
+ public static class SendBatch extends ListeningBitCommand<Ack> {
final FragmentWritableBatch batch;
final FragmentContext context;
- public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
- super();
+ public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch, FragmentContext context) {
+ super(listener);
this.batch = batch;
this.context = context;
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- logger.debug("Sending record batch. {}", batch);
- return connection.sendRecordBatch(context, batch);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
}
+ @Override
+ public String toString() {
+ return "SendBatch [batch.header=" + batch.getHeader() + "]";
+ }
+
+
}
- public class SendFragmentStatus extends BitCommand<Ack> {
+ public static class SendFragmentStatus extends FutureBitCommand<Ack> {
final FragmentStatus status;
public SendFragmentStatus(FragmentStatus status) {
@@ -114,12 +98,13 @@ public class BitTunnel {
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.sendFragmentStatus(status);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
}
+
}
- public class CancelFragment extends BitCommand<Ack> {
+ public static class CancelFragment extends FutureBitCommand<Ack> {
final FragmentHandle handle;
public CancelFragment(FragmentHandle handle) {
@@ -128,109 +113,23 @@ public class BitTunnel {
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.cancelFragment(handle);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
}
}
- public class SendFragment extends BitCommand<Ack> {
+ public static class SendFragment extends ListeningBitCommand<Ack> {
final PlanFragment fragment;
- public SendFragment(PlanFragment fragment) {
- super();
+ public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) {
+ super(listener);
this.fragment = fragment;
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.sendFragment(fragment);
- }
-
- }
-
-
-
-
- private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
-
- public void addLightListener(RpcOutcomeListener<T> outcomeListener){
- this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
- }
-
- public BitCommand() {
- super(SettableFuture.<T> create());
- }
-
- public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
-
- public final void run() {
-
- try {
-
- BitConnection connection = manager.getConnection(0);
- assert connection != null : "The connection manager should never return a null connection. Worse case, it should throw an exception.";
- CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
- rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
- } catch (RpcException ex) {
- ((SettableFuture<T>) delegate()).setException(ex);
- }
-
- }
-
- @Override
- protected RpcException mapException(Exception e) {
- Throwable t = e;
- if (e instanceof ExecutionException) {
- t = e.getCause();
- }
- if (t instanceof RpcException) return (RpcException) t;
- return new RpcException(t);
- }
-
- public class RpcOutcomeListenerWrapper implements Runnable{
- final RpcOutcomeListener<T> inner;
-
- public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
- this.inner = inner;
- }
-
- @Override
- public void run() {
- try{
- inner.success(BitCommand.this.checkedGet());
- }catch(RpcException e){
- inner.failed(e);
- }
- }
- }
-
- @Override
- public String toString() {
- return "BitCommand ["+this.getClass().getSimpleName()+"]";
- }
-
-
-
- }
-
- private class FutureBridge<T> implements Runnable {
- final SettableFuture<T> out;
- final CheckedFuture<T, RpcException> in;
-
- public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
- super();
- this.out = out;
- this.in = in;
- }
-
- @Override
- public void run() {
- try {
- out.set(in.checkedGet());
- } catch (RpcException ex) {
- out.setException(ex);
- }
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
new file mode 100644
index 0000000..8afbc33
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class ConnectionManagerRegistry implements Iterable<BitConnectionManager>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
+
+ private final ConcurrentMap<DrillbitEndpoint, BitConnectionManager> registry = Maps.newConcurrentMap();
+
+ private final BitComHandler handler;
+ private final BootStrapContext context;
+ private final ListenerPool listenerPool;
+ private volatile DrillbitEndpoint localEndpoint;
+
+ public ConnectionManagerRegistry(BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+ super();
+ this.handler = handler;
+ this.context = context;
+ this.listenerPool = listenerPool;
+ }
+
+ public BitConnectionManager getConnectionManager(DrillbitEndpoint endpoint){
+ assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
+ BitConnectionManager m = registry.get(endpoint);
+ if(m == null){
+ m = new BitConnectionManager(endpoint, localEndpoint, handler, context, listenerPool);
+ BitConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+ if(m2 != null) m = m2;
+ }
+
+ return m;
+ }
+
+ @Override
+ public Iterator<BitConnectionManager> iterator() {
+ return registry.values().iterator();
+ }
+
+ public void setEndpoint(DrillbitEndpoint endpoint){
+ this.localEndpoint = endpoint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
new file mode 100644
index 0000000..fa3b518
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class FutureBitCommand<T extends MessageLite> implements BitCommand<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
+
+ protected final SettableFuture<T> settableFuture;
+ private final RpcCheckedFuture<T> parentFuture;
+
+ public FutureBitCommand() {
+ this.settableFuture = SettableFuture.create();
+ this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+ }
+
+ public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+ @Override
+ public void connectionAvailable(BitConnection connection) {
+
+ doRpcCall(new DeferredRpcOutcome(), connection);
+ }
+
+ @Override
+ public void connectionSucceeded(BitConnection connection) {
+ connectionAvailable(connection);
+ }
+
+ private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+ @Override
+ public void failed(RpcException ex) {
+ settableFuture.setException(ex);
+ }
+
+ @Override
+ public void success(T value) {
+ settableFuture.set(value);
+ }
+
+ }
+
+ public DrillRpcFuture<T> getFuture() {
+ return parentFuture;
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ settableFuture.setException(RpcException.mapException(
+ String.format("Command failed while establishing connection. Failure type %s.", type), t));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
index 8f299d2..84dba85 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -22,32 +22,35 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.work.foreman.FragmentStatusListener;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
public class ListenerPool {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
- private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+ private final ConcurrentMap<QueryId, FragmentStatusListener> listeners;
public ListenerPool(int par){
- listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+ listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(16, 0.75f, par);
}
public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+ logger.debug("Removing framgent status listener for handle {}.", handle);
listeners.remove(handle);
}
public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
- FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+ logger.debug("Adding framgent status listener for handle {}.", handle);
+ FragmentStatusListener old = listeners.putIfAbsent(handle.getQueryId(), listener);
if(old != null) throw new RpcException("Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another.");
}
public void status(FragmentStatus status){
- FragmentStatusListener l = listeners.get(status.getHandle());
+ FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
if(l == null){
- logger.info("A fragment message arrived but there was no registered listener for that message.");
+
+ logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", status.getHandle());
return;
}else{
l.statusUpdate(status);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
new file mode 100644
index 0000000..90db6a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningBitCommand.class);
+
+ private final RpcOutcomeListener<T> listener;
+
+ public ListeningBitCommand(RpcOutcomeListener<T> listener) {
+ this.listener = listener;
+ }
+
+ public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+ @Override
+ public void connectionAvailable(BitConnection connection) {
+
+ doRpcCall(new DeferredRpcOutcome(), connection);
+ }
+
+ @Override
+ public void connectionSucceeded(BitConnection connection) {
+ connectionAvailable(connection);
+ }
+
+ private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+ @Override
+ public void failed(RpcException ex) {
+ listener.failed(ex);
+ }
+
+ @Override
+ public void success(T value) {
+ listener.success(value);
+ }
+
+ }
+
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ listener.failed(RpcException.mapException(
+ String.format("Command failed while establishing connection. Failure type %s.", type), t));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 3df88b7..779085c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -45,5 +45,12 @@ public class QueryResultBatch {
public boolean hasData(){
return data != null;
}
+
+ @Override
+ public String toString() {
+ return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
new file mode 100644
index 0000000..0aa7c86
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
+ * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
+ * query.
+ *
+ * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
+ *
+ */
+public class QueryResultHandler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+ private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
+
+ public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){
+ return new SubmissionListener(listener);
+ }
+
+ public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
+ final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+ UserResultsListener l = resultsListener.get(result.getQueryId());
+ // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+ if (l != null) {
+ // logger.debug("Results listener available, using existing.");
+ l.resultArrived(batch);
+ if (result.getIsLastChunk()) {
+ resultsListener.remove(result.getQueryId(), l);
+ }
+ } else {
+ logger.debug("Results listener not available, creating a buffering listener.");
+ // manage race condition where we start getting results before we receive the queryid back.
+ BufferingListener bl = new BufferingListener();
+ l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+ if (l != null) {
+ l.resultArrived(batch);
+ } else {
+ bl.resultArrived(batch);
+ }
+ }
+ }
+
+ private class BufferingListener implements UserResultsListener {
+
+ private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private volatile UserResultsListener output;
+
+ public boolean transferTo(UserResultsListener l) {
+ synchronized (this) {
+ output = l;
+ boolean last = false;
+ for (QueryResultBatch r : results) {
+ l.resultArrived(r);
+ last = r.getHeader().getIsLastChunk();
+ }
+ return last;
+ }
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ synchronized (this) {
+ if (output == null) {
+ this.results.add(result);
+ } else {
+ output.resultArrived(result);
+ }
+ }
+ }
+
+ @Override
+ public void submissionFailed(RpcException ex) {
+ throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+ }
+
+ }
+
+ private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+ private UserResultsListener listener;
+
+ public SubmissionListener(UserResultsListener listener) {
+ super();
+ this.listener = listener;
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ listener.submissionFailed(ex);
+ }
+
+ @Override
+ public void success(QueryId queryId) {
+ logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener);
+ UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+ // we need to deal with the situation where we already received results by the time we got the query id back. In
+ // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+ // results during the transition
+ if (oldListener != null) {
+ logger.debug("Unable to place user results listener, buffering listener was already in place.");
+ if (oldListener instanceof BufferingListener) {
+ resultsListener.remove(oldListener);
+ boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+ // simply remove the buffering listener if we already have the last response.
+ if (all) {
+ resultsListener.remove(oldListener);
+ } else {
+ boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+ if (!replaced) throw new IllegalStateException();
+ }
+ } else {
+ throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+ }
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5d2e799..ad44ff2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -21,11 +21,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -36,115 +31,27 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.BasicClientWithConnection;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
import com.google.protobuf.MessageLite;
-public class UserClient extends BasicClientWithConnection<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
- private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+ private final QueryResultHandler queryResultHandler = new QueryResultHandler();
public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
- }
-
- public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
- this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
- return resultsListener.getFuture();
+ super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
}
- public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
- return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
- }
-
- private class BufferingListener extends UserResultsListener {
-
- private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private volatile UserResultsListener output;
-
- public boolean transferTo(UserResultsListener l) {
- lock.writeLock().lock();
- output = l;
- boolean last = false;
- for (QueryResultBatch r : results) {
- l.resultArrived(r);
- last = r.getHeader().getIsLastChunk();
- }
- if (future.isDone()) {
- l.set();
- }
- return last;
- }
-
- @Override
- public void resultArrived(QueryResultBatch result) {
- logger.debug("Result arrvied.");
- lock.readLock().lock();
- try {
- if (output == null) {
- this.results.add(result);
- } else {
- output.resultArrived(result);
- }
-
- } finally {
- lock.readLock().unlock();
- }
-
- }
-
- @Override
- public void submissionFailed(RpcException ex) {
- throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
- }
-
+ public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
+ send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
- private class SubmissionListener extends RpcOutcomeListener<QueryId> {
- private UserResultsListener listener;
-
- public SubmissionListener(UserResultsListener listener) {
- super();
- this.listener = listener;
- }
-
- @Override
- public void failed(RpcException ex) {
- listener.submissionFailed(ex);
- }
-
- @Override
- public void success(QueryId queryId) {
- logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener);
- UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
- // we need to deal with the situation where we already received results by the time we got the query id back. In
- // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
- // results during the transition
- if (oldListener != null) {
- logger.debug("Unable to place user results listener, buffering listener was already in place.");
- if (oldListener instanceof BufferingListener) {
- resultsListener.remove(oldListener);
- boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
- // simply remove the buffering listener if we already have the last response.
- if (all) {
- resultsListener.remove(oldListener);
- } else {
- boolean replaced = resultsListener.replace(queryId, oldListener, listener);
- if (!replaced) throw new IllegalStateException();
- }
- } else {
- throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
- }
- }
-
- }
-
+ public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
+ UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
+ this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort());
}
@Override
@@ -165,29 +72,7 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
switch (rpcType) {
case RpcType.QUERY_RESULT_VALUE:
- final QueryResult result = get(pBody, QueryResult.PARSER);
- final QueryResultBatch batch = new QueryResultBatch(result, dBody);
- UserResultsListener l = resultsListener.get(result.getQueryId());
-// logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
- if (l != null) {
-// logger.debug("Results listener available, using existing.");
- l.resultArrived(batch);
- if (result.getIsLastChunk()) {
- resultsListener.remove(result.getQueryId(), l);
- l.set();
- }
- } else {
- logger.debug("Results listener not available, creating a buffering listener.");
- // manage race condition where we start getting results before we receive the queryid back.
- BufferingListener bl = new BufferingListener();
- l = resultsListener.putIfAbsent(result.getQueryId(), bl);
- if (l != null) {
- l.resultArrived(batch);
- } else {
- bl.resultArrived(batch);
- }
- }
-
+ queryResultHandler.batchArrived(pBody, dBody);
return new Response(RpcType.ACK, Ack.getDefaultInstance());
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
@@ -196,18 +81,16 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
}
@Override
- protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
- return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+ protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+ logger.debug("Handling handshake from bit to user. {}", inbound);
+ if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(),
+ UserRpcConfig.RPC_VERSION));
- @Override
- protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
- logger.debug("Handling handshake from bit to user. {}", inbound);
- if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
- inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
- }
+ }
- };
+ @Override
+ protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3ce14f0..b1dbfe8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,17 +24,8 @@ import org.apache.drill.exec.rpc.RpcException;
import com.google.common.util.concurrent.SettableFuture;
-public abstract class UserResultsListener {
- SettableFuture<Void> future = SettableFuture.create();
+public interface UserResultsListener {
- final void set(){
- future.set(null);
- }
-
- Future<Void> getFuture(){
- return future;
- }
-
public abstract void submissionFailed(RpcException ex);
public abstract void resultArrived(QueryResultBatch result);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 406afc4..908af61 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -26,16 +26,15 @@ import io.netty.channel.EventLoopGroup;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.work.user.UserWorker;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -100,8 +99,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
super(channel);
}
- public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
- return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+ logger.debug("Sending result to client with {}", result);
+ send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
}
}
@@ -112,7 +112,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+ protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(UserClientConnection connection) {
return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3c4d9af..ed13748 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
public BootStrapContext(DrillConfig config) {
super();
this.config = config;
- this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+ this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
this.allocator = BufferAllocator.getAllocator(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 0337a68..199768f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.LocalCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.LocalClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
public class RemoteServiceSet implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -37,6 +38,7 @@ public class RemoteServiceSet implements Closeable{
this.coordinator = coordinator;
}
+
public DistributedCache getCache() {
return cache;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d6d3b9c..b07f274 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -53,12 +53,12 @@ public class ServiceEngine implements Closeable{
public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
- int bitPort = bitCom.start();
- return DrillbitEndpoint.newBuilder()
+ DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
.setAddress(InetAddress.getLocalHost().getHostAddress())
- .setBitPort(bitPort)
.setUserPort(userPort)
.build();
+
+ return bitCom.start(partialEndpoint);
}
public BitCom getBitCom(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
index f6a9786..9a72845 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -18,10 +18,9 @@
package org.apache.drill.exec.work;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+public abstract class EndpointListener<RET, V> extends BaseRpcOutcomeListener<RET>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
protected final DrillbitEndpoint endpoint;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
index 2900d99..554b398 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -65,6 +65,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
@Override
public void run() {
+ logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
return;
@@ -76,7 +77,12 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
try{
while(state.get() == FragmentState.RUNNING_VALUE){
if(!root.next()){
- updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ if(context.isFailed()){
+ updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+ }else{
+ updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ }
+
}
}
@@ -90,7 +96,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
}finally{
t.stop();
}
-
+ logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
}
private void internalFail(Throwable excep){