You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/22 03:39:05 UTC
[1/3] Clean up threading of client/server. Utilize command pattern
for BitCom stuff to abstract away connection failures. Works on one bit
single exchange remote query now. Next up, two bit single exchange query.
Updated Branches:
refs/heads/execwork e57a8d6d4 -> b8db98ad7
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index d3664a0..1170a1e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
@@ -40,7 +41,6 @@ import org.apache.drill.exec.work.batch.BitComHandler;
import org.apache.drill.exec.work.batch.BitComHandlerImpl;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
import org.apache.drill.exec.work.user.UserWorker;
import com.google.common.collect.Maps;
@@ -63,7 +63,7 @@ public class WorkManager implements Closeable{
private final BitComHandler bitComWorker;
private final UserWorker userWorker;
private final WorkerBee bee;
- private Executor executor = Executors.newFixedThreadPool(4);
+ private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("Working Thread - "));
private final EventThread eventThread;
public WorkManager(BootStrapContext context){
@@ -148,9 +148,10 @@ public class WorkManager implements Closeable{
public void run() {
try {
while(true){
- logger.debug("Checking for pending work tasks.");
+ logger.debug("Polling for pending work tasks.");
Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
if(r != null){
+ logger.debug("Starting pending task {}", r);
executor.execute(r);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index 5dacb71..ec03392 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -67,18 +67,23 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
public abstract void streamFinished(int minorFragmentId);
- public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+ boolean decremented = false;
if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
int rem = remainingRequired.decrementAndGet();
if (rem == 0) {
parentAccounter.decrementAndGet();
+ decremented = true;
}
}
if(batch.getHeader().getIsLastBatch()){
streamFinished(minorFragmentId);
}
getBuffer(minorFragmentId).enqueue(throttle, batch);
+ return decremented;
}
+
+
protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index ff091d7..b5a497e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -24,8 +24,7 @@ import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
interface BatchCollector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
-
- public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+ public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
public int getOppositeMajorFragmentId();
public RawBatchBuffer[] getBuffers();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 9b227da..edda714 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -159,14 +159,11 @@ public class BitComHandlerImpl implements BitComHandler {
// Create a handler if there isn't already one.
if(handler == null){
-
-
PlanFragment fragment = bee.getContext().getCache().getFragment(handle);
if(fragment == null){
logger.error("Received batch where fragment was not in cache.");
return Acks.FAIL;
}
-
IncomingFragmentHandler newHandler = new RemoteFragmentHandler(fragment, bee.getContext(), bee.getContext().getBitCom().getTunnel(fragment.getForeman()));
@@ -174,7 +171,7 @@ public class BitComHandlerImpl implements BitComHandler {
handler = handlers.putIfAbsent(fragment.getHandle(), newHandler);
if(handler == null){
- // we added a handler, inform foreman that we did so. This way, the foreman can track status. We also tell foreman that we don't need inform ourself.
+ // we added a handler, inform the bee that we did so. This way, the foreman can track status.
bee.addFragmentPendingRemote(newHandler);
handler = newHandler;
}
@@ -182,10 +179,12 @@ public class BitComHandlerImpl implements BitComHandler {
boolean canRun = handler.handle(connection.getConnectionThrottle(), new RawFragmentBatch(fragmentBatch, body));
if(canRun){
+ logger.debug("Arriving batch means local batch can run, starting local batch.");
// if we've reached the canRun threshold, we'll proceed. This expects handler.handle() to only return a single true.
bee.startFragmentPendingRemote(handler);
}
- if(handler.isDone()){
+ if(fragmentBatch.getIsLastBatch() && !handler.isWaiting()){
+ logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(), handler.isWaiting());
handlers.remove(handler.getHandle());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 20775c5..264c4b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,6 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.work.batch;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,6 +29,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -42,7 +44,10 @@ public class IncomingBuffers {
public IncomingBuffers(PhysicalOperator root) {
Map<Integer, BatchCollector> counts = Maps.newHashMap();
- root.accept(new CountRequiredFragments(), counts);
+ CountRequiredFragments reqFrags = new CountRequiredFragments();
+ root.accept(reqFrags, counts);
+
+ logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), counts);
streamsRemaining.set(remainingRequired.get());
fragCounts = ImmutableMap.copyOf(counts);
}
@@ -53,11 +58,13 @@ public class IncomingBuffers {
if(batch.getHeader().getIsLastBatch()){
streamsRemaining.decrementAndGet();
}
+ int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
+ BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
+ if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d.", sendMajorFragmentId));
+ boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
- BatchCollector fSet = fragCounts.get(batch.getHeader().getSendingMajorFragmentId());
- if (fSet == null) throw new FragmentSetupException("We received a major fragment id that we were not expecting.");
- fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
- return remainingRequired.get() == 0;
+ // we should only return true if remaining required has been decremented and is currently equal to zero.
+ return decremented && remainingRequired.get() == 0;
}
public int getRemainingRequired() {
@@ -75,7 +82,7 @@ public class IncomingBuffers {
* Designed to setup initial values for arriving fragment accounting.
*/
public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, BatchCollector>, RuntimeException> {
-
+
@Override
public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
BatchCollector set;
@@ -84,7 +91,7 @@ public class IncomingBuffers {
} else {
set = new PartitionedCollector(remainingRequired, receiver);
}
-
+
counts.put(set.getOppositeMajorFragmentId(), set);
remainingRequired.incrementAndGet();
return null;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index e21d69a..93868a7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -27,7 +27,7 @@ public class MergingCollector extends AbstractFragmentCollector{
public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
super(parentAccounter, receiver, 1);
- streamsRunning = new AtomicInteger(parentAccounter.get());
+ streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
}
@Override
@@ -35,10 +35,11 @@ public class MergingCollector extends AbstractFragmentCollector{
return buffers[0];
}
- @Override
+
public void streamFinished(int minorFragmentId) {
if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 116ca26..25b5884 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -36,6 +36,7 @@ public class PartitionedCollector extends AbstractFragmentCollector{
public void streamFinished(int minorFragmentId) {
buffers[minorFragmentId].finished();
}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
index f97d878..71ae576 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
@@ -65,7 +65,7 @@ public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
}
}
- return null;
+ return b;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index dea8282..f86c4fb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -24,12 +24,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.ImplCreator;
-import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
@@ -45,8 +42,8 @@ import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.util.AtomicState;
@@ -126,14 +123,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
void cleanupAndSendResult(QueryResult result){
bee.retireForeman(this);
- initiatingClient.sendResult(new QueryWritableBatch(result)).addLightListener(new ResponseSendListener());
+ initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result));
}
- private class ResponseSendListener extends RpcOutcomeListener<Ack> {
+ private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
public void failed(RpcException ex) {
- logger
- .info(
+ logger.info(
"Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
ex);
}
@@ -193,12 +189,17 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
fail("Failure while fragmenting query.", e);
return;
}
+
+
+
+
PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
SimpleParallelizer parallelizer = new SimpleParallelizer();
try {
QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
+ this.context.getBitCom().getListeners().addFragmentStatusListener(work.getRootFragment().getHandle(), fragmentManager);
List<PlanFragment> leafFragments = Lists.newArrayList();
// store fragments in distributed grid.
@@ -213,7 +214,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments);
- } catch (ExecutionSetupException e) {
+ } catch (ExecutionSetupException | RpcException e) {
fail("Failure while setting up query.", e);
}
@@ -245,9 +246,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
return this.state.getState();
}
- public boolean rootCoorespondsTo(FragmentHandle handle){
- throw new UnsupportedOperationException();
- }
class ForemanManagerListener{
void fail(String message, Throwable t) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 20797b8..f069db7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -64,12 +64,13 @@ class RunningFragmentManager implements FragmentStatusListener{
this.foreman = foreman;
this.tun = tun;
this.remainingFragmentCount = new AtomicInteger(0);
+
}
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
remainingFragmentCount.set(leafFragments.size()+1);
- // set up the root framgnet first so we'll have incoming buffers available.
+ // set up the root fragment first so we'll have incoming buffers available.
{
IncomingBuffers buffers = new IncomingBuffers(rootOperator);
@@ -97,13 +98,13 @@ class RunningFragmentManager implements FragmentStatusListener{
private void sendRemoteFragment(PlanFragment fragment){
map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
- tun.get(fragment.getAssignment()).sendFragment(fragment).addLightListener(listener);
+ tun.get(fragment.getAssignment()).sendFragment(listener, fragment);
}
@Override
public void statusUpdate(FragmentStatus status) {
-
+ logger.debug("New fragment status was provided to Foreman of {}", status);
switch(status.getState()){
case AWAITING_ALLOCATION:
updateStatus(status);
@@ -205,6 +206,7 @@ class RunningFragmentManager implements FragmentStatusListener{
@Override
public void failed(RpcException ex) {
+ logger.debug("Failure while sending fragment. Stopping query.", ex);
stopQuery();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
index b4e9308..b23f003 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
@@ -44,6 +44,6 @@ public interface IncomingFragmentHandler {
public abstract FragmentRunner getRunnable();
public abstract void cancel();
- public boolean isDone();
+ public boolean isWaiting();
public abstract FragmentHandle getHandle();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
index 3f710ed..5ffd09a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
@@ -60,8 +60,8 @@ public class LocalFragmentHandler implements IncomingFragmentHandler{
}
@Override
- public boolean isDone() {
- return cancel || isDone();
+ public boolean isWaiting() {
+ return !buffers.isDone() && !cancel;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index 70d7e93..4a5dbf2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -113,8 +113,8 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
}
@Override
- public boolean isDone() {
- return cancel || buffers.isDone();
+ public boolean isWaiting() {
+ return !buffers.isDone() && !cancel;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
index 7c6bfe5..586ccf6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import static org.junit.Assert.*;
+
import java.util.List;
import org.apache.drill.common.util.FileUtils;
@@ -26,26 +28,29 @@ import org.apache.drill.exec.proto.UserProtos.QueryType;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-@Ignore
+//@Ignore
public class DistributedFragmentRun extends PopUnitTestBase{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedFragmentRun.class);
@Test
- public void simpleDistributedQuery() throws Exception{
+ public void oneBitOneExchangeRun() throws Exception{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+
+ try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
bit1.run();
- bit2.run();
client.connect();
List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"), Charsets.UTF_8));
- System.out.println(results);
+ int count = 0;
+ for(QueryResultBatch b : results){
+ count += b.getHeader().getRowCount();
+ }
+ assertEquals(100, count);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
index 7b7ab8e..1e0c5b6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
@@ -23,28 +23,12 @@ import static org.junit.Assert.assertNull;
import java.io.IOException;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.fragment.Fragment;
-import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
-import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.planner.PhysicalPlanReader;
-import org.apache.drill.exec.planner.SimpleExecPlanner;
+import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.QueryWorkUnit;
-import org.junit.BeforeClass;
import org.junit.Test;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
public class CheckFragmenter extends PopUnitTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CheckFragmenter.class);
@@ -77,10 +61,11 @@ public class CheckFragmenter extends PopUnitTestBase {
assertNotNull(b.getSendingExchange());
}
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
index 6f229a3..e1db639 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
@@ -17,16 +17,15 @@
******************************************************************************/
package org.apache.drill.exec.pop;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -41,26 +40,46 @@ public class FragmentChecker extends PopUnitTestBase{
@Test
public void checkSimpleExchangePlan() throws Exception{
+ print("/physical_simpleexchange.json", 2, 3);
+
+ }
+
+
+ private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
+ System.out.println(String.format("=================Building plan fragments for [%s]. Allowing %d total Drillbits.==================", fragmentFile, bitCount));
PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
- Fragment fragmentRoot = getRootFragment(ppr, "/physical_simpleexchange.json");
+ Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
SimpleParallelizer par = new SimpleParallelizer();
+ List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+ DrillbitEndpoint localBit = null;
+ for(int i =0; i < bitCount; i++){
+ DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234+i).build();
+ if(i ==0) localBit = b1;
+ endpoints.add(b1);
+ }
- DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234).build();
- DrillbitEndpoint b2 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(2345).build();
- QueryWorkUnit qwu = par.getFragments(b1, QueryId.getDefaultInstance(), Lists.newArrayList(b1, b2), ppr, fragmentRoot, planningSet, 10);
- assertEquals(qwu.getFragments().size(), 3);
- System.out.println("=========ROOT FRAGMENT=========");
+ QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10);
+ System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
+
System.out.print(qwu.getRootFragment().getFragmentJson());
for(PlanFragment f : qwu.getFragments()){
- System.out.println("=========");
+ System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
System.out.print(f.getFragmentJson());
}
+
+ //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
+
logger.debug("Planning Set {}", planningSet);
+ }
+
+ @Test
+ public void validateSingleExchangeFragment() throws Exception{
+ print("/physical_single_exchange.json", 1, 2);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 9684e9f..038b093 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -19,27 +19,25 @@ package org.apache.drill.exec.server;
import io.netty.buffer.ByteBuf;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.bit.BitClient;
-import org.apache.drill.exec.rpc.bit.BitComImpl;
import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager;
import org.apache.drill.exec.rpc.bit.BitRpcConfig;
import org.apache.drill.exec.rpc.bit.BitServer;
+import org.apache.drill.exec.rpc.bit.BitTunnel.SendFragmentStatus;
+import org.apache.drill.exec.rpc.bit.ConnectionManagerRegistry;
import org.apache.drill.exec.rpc.bit.ListenerPool;
import org.apache.drill.exec.work.batch.BitComHandler;
import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
import org.junit.Test;
-import com.google.common.collect.Maps;
-
public class TestBitRpc {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
@@ -47,13 +45,19 @@ public class TestBitRpc {
public void testBasicConnectionAndHandshake() throws Exception{
int port = 1234;
BootStrapContext c = new BootStrapContext(DrillConfig.create());
- ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
- BitServer server = new BitServer(new BitComTestHandler(), c, registry, new ListenerPool(2));
+ final BitComTestHandler handler = new BitComTestHandler();
+ final ListenerPool listeners = new ListenerPool(2);
+ ConnectionManagerRegistry registry = new ConnectionManagerRegistry(handler, c, listeners);
+ BitServer server = new BitServer(handler, c, registry, listeners);
port = server.bind(port);
+ DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build();
+ registry.setEndpoint(ep);
for(int i =0; i < 10; i++){
- BitClient client = new BitClient(DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build(), null, new BitComTestHandler(), c, registry, new ListenerPool(2));
- client.connect();
-
+ try(BitConnectionManager cm = new BitConnectionManager(ep, ep, handler, c, listeners)){
+ SendFragmentStatus cmd = new SendFragmentStatus(FragmentStatus.getDefaultInstance());
+ cm.runCommand(cmd);
+ cmd.getFuture().checkedGet();
+ }
}
System.out.println("connected");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
index 675ecfb..0e1921e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
@@ -17,7 +17,6 @@
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "INT", mode: "REQUIRED"}
]}
-
]
},
{
[2/3] Clean up threading of client/server. Utilize command pattern
for BitCom stuff to abstract away connection failures. Works on one bit
single exchange remote query now. Next up, two bit single exchange query.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index 4ba99a1..82a6aa6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -22,57 +22,54 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
import com.google.protobuf.MessageLite;
-public class BitClient extends BasicClient<RpcType, BitConnection>{
+public class BitClient extends BasicClient<RpcType, BitConnection, BitHandshake, BitHandshake>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
private final BitComHandler handler;
- private final DrillbitEndpoint endpoint;
- private BitConnection connection;
- private final AvailabilityListener openListener;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+ private final DrillbitEndpoint remoteEndpoint;
+ private volatile BitConnection connection;
private final ListenerPool listeners;
+ private final CloseHandlerCreator closeHandlerFactory;
+ private final DrillbitEndpoint localIdentity;
- public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
- super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
-
- this.endpoint = endpoint;
+ public BitClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, BitComHandler handler, BootStrapContext context, CloseHandlerCreator closeHandlerFactory, ListenerPool listeners) {
+ super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER);
+ this.localIdentity = localEndpoint;
+ this.remoteEndpoint = remoteEndpoint;
this.handler = handler;
- this.openListener = openListener;
- this.registry = registry;
this.listeners = listeners;
+ this.closeHandlerFactory = closeHandlerFactory;
}
- public BitHandshake connect() throws RpcException, InterruptedException{
- BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
- connection.setEndpoint(endpoint);
- return bs;
+ public void connect(RpcConnectionHandler<BitConnection> connectionHandler) {
+ connectAsClient(connectionHandler, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getBitPort());
}
@SuppressWarnings("unchecked")
@Override
public BitConnection initRemoteConnection(Channel channel) {
- this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+ this.connection = new BitConnection(channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, listeners);
return connection;
}
@Override
protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
- return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
+ return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
}
@Override
@@ -86,18 +83,15 @@ public class BitClient extends BasicClient<RpcType, BitConnection>{
}
@Override
- protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
- return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
-
- @Override
- protected void validateHandshake(BitHandshake inbound) throws Exception {
- logger.debug("Handling handshake from bit server to bit client. {}", inbound);
- if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
- }
+ protected void validateHandshake(BitHandshake handshake) throws RpcException {
+ if(handshake.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ }
- };
+ @Override
+ protected void finalizeConnection(BitHandshake handshake, BitConnection connection) {
+ connection.setEndpoint(handshake.getEndpoint());
}
-
+
public BitConnection getConnection(){
return this.connection;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index c60d36b..f7f508e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -40,11 +40,17 @@ public interface BitCom extends Closeable {
*/
public BitTunnel getTunnel(DrillbitEndpoint node) ;
- public int start() throws InterruptedException, DrillbitStartupException;
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
/**
* Register an incoming batch handler for a local foreman.
* @param handler
*/
public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
+
+ /**
+ * Get ListenerPool
+ * @return
+ */
+ public ListenerPool getListeners();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index c98be44..d1cadc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -18,157 +18,68 @@
package org.apache.drill.exec.rpc.bit;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
/**
- * Manages communication tunnels between nodes.
+ * Manages communication tunnels between nodes.
*/
public class BitComImpl implements BitCom {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
private final ListenerPool listeners;
private volatile BitServer server;
private final BitComHandler handler;
private final BootStrapContext context;
-
- // TODO: this executor should be removed.
- private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
+ private final ConnectionManagerRegistry connectionRegistry;
public BitComImpl(BootStrapContext context, BitComHandler handler) {
super();
this.handler = handler;
this.context = context;
this.listeners = new ListenerPool(8);
+ this.connectionRegistry = new ConnectionManagerRegistry(handler, context, listeners);
}
- public int start() throws InterruptedException, DrillbitStartupException {
- server = new BitServer(handler, context, registry, listeners);
+ @Override
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+ server = new BitServer(handler, context, connectionRegistry, listeners);
int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
- return server.bind(port);
- }
-
- private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
-
-
- SettableFuture<BitConnection> future = SettableFuture.create();
- BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
- BitConnection t = null;
-
- if (check) {
- t = registry.get(endpoint);
-
- if (t != null) {
- future.set(t);
- return checkedFuture;
- }
- }
-
- try {
- AvailWatcher watcher = new AvailWatcher(future);
- BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
- c.connect();
- return checkedFuture;
- } catch (InterruptedException | RpcException e) {
- future.setException(new FragmentSetupException("Unable to open connection"));
- return checkedFuture;
- }
-
+ port = server.bind(port);
+ DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setBitPort(port).build();
+ connectionRegistry.setEndpoint(completeEndpoint);
+ return completeEndpoint;
}
- private class AvailWatcher implements AvailabilityListener{
- final SettableFuture<BitConnection> future;
-
- public AvailWatcher(SettableFuture<BitConnection> future) {
- super();
- this.future = future;
- }
-
- @Override
- public void isAvailable(BitConnection connection) {
- future.set(connection);
- }
-
- }
- BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
- BitConnection t = registry.get(endpoint);
- if(t != null) return t;
- return this.getNode(endpoint, false).checkedGet();
+
+ public ListenerPool getListeners() {
+ return listeners;
}
-
- CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
- return this.getNode(endpoint, true);
- }
-
-
@Override
- public BitTunnel getTunnel(DrillbitEndpoint endpoint){
- BitConnection t = registry.get(endpoint);
- if(t == null){
- return new BitTunnel(exec, endpoint, this, t);
- }else{
- return new BitTunnel(exec, endpoint, this, this.getNode(endpoint, false));
- }
+ public BitTunnel getTunnel(DrillbitEndpoint endpoint) {
+ return new BitTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
}
-
- /**
- * A future which remaps exceptions to a BitComException.
- * @param <T>
- */
- private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
-
- protected BitComFuture(ListenableFuture<T> delegate) {
- super(delegate);
- }
-
- @Override
- protected RpcException mapException(Exception e) {
- Throwable t = e;
- if(e instanceof ExecutionException){
- t = e.getCause();
- }
-
- if(t instanceof RpcException) return (RpcException) t;
- return new RpcException(t);
- }
+ @Override
+ public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+ this.handler.registerIncomingFragmentHandler(handler);
}
public void close() {
Closeables.closeQuietly(server);
- for (BitConnection bt : registry.values()) {
- bt.shutdownIfClient();
+ for (BitConnectionManager bt : connectionRegistry) {
+ bt.close();
}
}
- @Override
- public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
- this.handler.registerIncomingFragmentHandler(handler);
- }
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
new file mode 100644
index 0000000..692c63e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+
+import com.google.protobuf.MessageLite;
+
+public interface BitCommand<T extends MessageLite> extends RpcConnectionHandler<BitConnection>{
+
+ public abstract void connectionAvailable(BitConnection connection);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
index 73980f9..f85ea74 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -17,6 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
@@ -35,31 +36,35 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
public class BitConnection extends RemoteConnection{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class);
private final RpcBus<RpcType, BitConnection> bus;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
private final ListenerPool listeners;
-
- private final AvailabilityListener listener;
private volatile DrillbitEndpoint endpoint;
private volatile boolean active = false;
private final UUID id;
- public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+ public BitConnection(Channel channel, RpcBus<RpcType, BitConnection> bus, ListenerPool listeners){
super(channel);
this.bus = bus;
- this.registry = registry;
// we use a local listener pool unless a global one is provided.
this.listeners = listeners != null ? listeners : new ListenerPool(2);
- this.listener = listener;
this.id = UUID.randomUUID();
}
+
+ void setEndpoint(DrillbitEndpoint endpoint){
+ assert this.endpoint == null : "Endpoint should only be set once (only in the case in incoming server requests).";
+ this.endpoint = endpoint;
+ active = true;
+ }
protected DrillbitEndpoint getEndpoint() {
return endpoint;
@@ -69,48 +74,12 @@ public class BitConnection extends RemoteConnection{
return listeners;
}
- protected void setEndpoint(DrillbitEndpoint endpoint) {
- Preconditions.checkNotNull(endpoint);
- Preconditions.checkArgument(this.endpoint == null);
-
- this.endpoint = endpoint;
- BitServer.logger.debug("Adding new endpoint to available BitServer connections. Endpoint: {}.", endpoint);
- synchronized(this){
- BitConnection c = registry.putIfAbsent(endpoint, this);
-
- if(c != null){ // the registry already has a connection like this
-
- // give the awaiting future an alternative connection.
- if(listener != null){
- listener.isAvailable(c);
- }
-
- // shut this down if this is a client as it won't be available in the registry.
- // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other. This shouldn't cause a problem.
- logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
- shutdownIfClient();
-
- }
- active = true;
- if(listener != null) listener.isAvailable(this);
- }
- }
-
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
- return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
- }
-
- public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
- return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
- }
- public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
- return bus.send(this, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){
+ bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
}
- public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
- return bus.send(this, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
- }
public void disable(){
active = false;
@@ -140,27 +109,7 @@ public class BitConnection extends RemoteConnection{
return true;
}
- public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
- return new CloseHandler(this, parent);
- }
-
- private class CloseHandler implements GenericFutureListener<ChannelFuture>{
- private BitConnection connection;
- private GenericFutureListener<ChannelFuture> parent;
-
- public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
- super();
- this.connection = connection;
- this.parent = parent;
- }
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
- parent.operationComplete(future);
- }
-
- }
public void shutdownIfClient(){
if(bus.isClient()) Closeables.closeQuietly(bus);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
index 0160d24..d99bb22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -17,58 +17,152 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.protobuf.MessageLite;
-public class BitConnectionManager {
+/**
+ * Manager all connections between two particular bits.
+ */
+public class BitConnectionManager implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
- private final int maxAttempts;
- private final BitComImpl com;
private final DrillbitEndpoint endpoint;
- private final AtomicReference<BitConnection> connection;
- private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+ private final AtomicReference<BitConnection> connectionHolder;
+ private final BitComHandler handler;
+ private final BootStrapContext context;
+ private final ListenerPool listenerPool;
+ private final DrillbitEndpoint localIdentity;
+
+ public BitConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+ assert remoteEndpoint != null : "Endpoint cannot be null.";
+ assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
+ assert remoteEndpoint.getBitPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k. Was set to %d.", remoteEndpoint.getBitPort());
+
+ this.connectionHolder = new AtomicReference<BitConnection>();
+ this.endpoint = remoteEndpoint;
+ this.localIdentity = localIdentity;
+ this.handler = handler;
+ this.context = context;
+ this.listenerPool = listenerPool;
+ }
+
+ public <R extends MessageLite> BitCommand<R> runCommand(BitCommand<R> cmd){
+ logger.debug("Running command {}", cmd);
+ BitConnection connection = connectionHolder.get();
+ if(connection != null){
+ if(connection.isActive()){
+ cmd.connectionAvailable(connection);
+ return cmd;
+ }else{
+ // remove the old connection. (don't worry if we fail since someone else should have done it.
+ connectionHolder.compareAndSet(connection, null);
+ }
+ }
+
+ /** We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another endpoint could create a reverse connection **/
+ synchronized(this){
+ connection = connectionHolder.get();
+ if(connection != null){
+ cmd.connectionAvailable(connection);
+ }else{
+ BitClient client = new BitClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator(), listenerPool);
+
+ client.connect(new ConnectionListeningDecorator(cmd, !endpoint.equals(localIdentity)));
+ }
+ return cmd;
+
+ }
+ }
+
+ CloseHandlerCreator getCloseHandlerCreator(){
+ return new CloseHandlerCreator();
+ }
- BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
- assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
- this.com = com;
- this.connection = new AtomicReference<BitConnection>(connection);
- this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
- this.endpoint = endpoint;
- this.maxAttempts = maxAttempts;
+ /** Factory for close handlers **/
+ class CloseHandlerCreator{
+ public GenericFutureListener<ChannelFuture> getHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent){
+ return new CloseHandler(connection, parent);
+ }
}
- BitConnection getConnection(int attempt) throws RpcException{
- BitConnection con = connection.get();
+
+
+ /**
+ * Listens for connection closes and clears connection holder.
+ */
+ private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+ private BitConnection connection;
+ private GenericFutureListener<ChannelFuture> parent;
- if(con != null){
- if(con.isActive()) return con;
- connection.compareAndSet(con, null);
+ public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+ super();
+ this.connection = connection;
+ this.parent = parent;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ connectionHolder.compareAndSet(connection, null);
+ parent.operationComplete(future);
}
- CheckedFuture<BitConnection, RpcException> fut = future.get();
+ }
+
+ /**
+ * Decorate a connection creation so that we capture a success and keep it available for future requests. If we have raced and another is already available... we return that one and close things down on this one.
+ */
+ private class ConnectionListeningDecorator implements RpcConnectionHandler<BitConnection>{
+
+ private final RpcConnectionHandler<BitConnection> delegate;
+ private final boolean closeOnDupe;
+
+ public ConnectionListeningDecorator(RpcConnectionHandler<BitConnection> delegate, boolean closeOnDupe) {
+ this.delegate = delegate;
+ this.closeOnDupe = closeOnDupe;
+ }
- if(fut != null){
- try{
- return fut.checkedGet();
- }catch(RpcException ex){
- future.compareAndSet(fut, null);
- if(attempt < maxAttempts){
- return getConnection(attempt + 1);
- }else{
- throw ex;
+ @Override
+ public void connectionSucceeded(BitConnection incoming) {
+ BitConnection connection = connectionHolder.get();
+ while(true){
+ boolean setted = connectionHolder.compareAndSet(null, incoming);
+ if(setted){
+ connection = incoming;
+ break;
}
+ connection = connectionHolder.get();
+ if(connection != null) break;
+ }
+
+
+ if(connection == incoming){
+ delegate.connectionSucceeded(connection);
+ }else{
+
+ if(closeOnDupe){
+ // close the incoming because another channel was created in the mean time (unless this is a self connection).
+ logger.debug("Closing incoming connection because a connection was already set.");
+ incoming.getChannel().close();
+ }
+ delegate.connectionSucceeded(connection);
}
}
-
- // no checked future, let's make one.
- fut = com.getConnectionAsync(endpoint);
- future.compareAndSet(null, fut);
- return getConnection(attempt);
+
+ @Override
+ public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+ delegate.connectionFailed(type, t);
+ }
}
@@ -76,5 +170,20 @@ public class BitConnectionManager {
return endpoint;
}
+ public void addServerConnection(BitConnection connection){
+ // if the connection holder is not set, set it to this incoming connection.
+ logger.debug("Setting server connection.");
+ this.connectionHolder.compareAndSet(null, connection);
+ }
+
+ @Override
+ public void close() {
+ BitConnection c = connectionHolder.getAndSet(null);
+ if(c != null){
+ c.getChannel().close();
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index 88ac6cc..d4665a8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -22,18 +22,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.BitComHandler;
@@ -43,13 +38,14 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
private final BitComHandler handler;
- private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
private final ListenerPool listeners;
+ private final ConnectionManagerRegistry connectionRegistry;
+ private volatile ProxyCloseHandler proxyCloseHandler;
- public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+ public BitServer(BitComHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry, ListenerPool listeners) {
super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
this.handler = handler;
- this.registry = registry;
+ this.connectionRegistry = connectionRegistry;
this.listeners = listeners;
}
@@ -65,23 +61,36 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
@Override
protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
- return connection.getCloseHandler(super.getCloseHandler(connection));
+ this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+ return proxyCloseHandler;
}
@Override
public BitConnection initRemoteConnection(Channel channel) {
- return new BitConnection(null, channel, this, registry, listeners);
+ return new BitConnection(channel, this, listeners);
}
@Override
- protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+ protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler(final BitConnection connection) {
return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
@Override
public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
- logger.debug("Handling handshake from other bit. {}", inbound);
+// logger.debug("Handling handshake from other bit. {}", inbound);
if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+ if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getBitPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information. Received %s.", inbound.getEndpoint()));
+ connection.setEndpoint(inbound.getEndpoint());
+
+ // add the
+ BitConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
+
+ // update the close handler.
+ proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+
+ // add to the connection manager.
+ manager.addServerConnection(connection);
+
return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
}
@@ -89,5 +98,30 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
}
+ private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
+
+ private volatile GenericFutureListener<ChannelFuture> handler;
+
+ public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
+ super();
+ this.handler = handler;
+ }
+
+
+ public GenericFutureListener<ChannelFuture> getHandler() {
+ return handler;
+ }
+
+
+ public void setHandler(GenericFutureListener<ChannelFuture> handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ handler.operationComplete(future);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 652fa52..83b7959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,95 +17,79 @@
******************************************************************************/
package org.apache.drill.exec.rpc.bit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-/**
- * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
- * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
- * and action. A better approach should be done.
- */
public class BitTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
- private static final int MAX_ATTEMPTS = 3;
-
private final BitConnectionManager manager;
- private final Executor exec;
-
+ private final DrillbitEndpoint endpoint;
- public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
- this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
- this.exec = exec;
- }
-
- public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
- CheckedFuture<BitConnection, RpcException> future) {
- this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
- this.exec = exec;
+ public BitTunnel(DrillbitEndpoint endpoint, BitConnectionManager manager) {
+ this.manager = manager;
+ this.endpoint = endpoint;
}
public DrillbitEndpoint getEndpoint(){
return manager.getEndpoint();
}
- private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
- exec.execute(command);
- return command;
- }
-
- public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
- return submit(new SendBatch(batch, context));
+ public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentContext context, FragmentWritableBatch batch) {
+ SendBatch b = new SendBatch(outcomeListener, batch, context);
+ manager.runCommand(b);
}
- public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
- return submit(new SendFragment(fragment));
+ public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){
+ SendFragment b = new SendFragment(outcomeListener, fragment);
+ manager.runCommand(b);
}
-
- public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
- return submit(new CancelFragment(handle));
+
+ public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+ CancelFragment b = new CancelFragment(handle);
+ manager.runCommand(b);
+ return b.getFuture();
}
-
+
public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
- return submit(new SendFragmentStatus(status));
+ SendFragmentStatus b = new SendFragmentStatus(status);
+ manager.runCommand(b);
+ return b.getFuture();
}
- public class SendBatch extends BitCommand<Ack> {
+ public static class SendBatch extends ListeningBitCommand<Ack> {
final FragmentWritableBatch batch;
final FragmentContext context;
- public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
- super();
+ public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch, FragmentContext context) {
+ super(listener);
this.batch = batch;
this.context = context;
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- logger.debug("Sending record batch. {}", batch);
- return connection.sendRecordBatch(context, batch);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
}
+ @Override
+ public String toString() {
+ return "SendBatch [batch.header=" + batch.getHeader() + "]";
+ }
+
+
}
- public class SendFragmentStatus extends BitCommand<Ack> {
+ public static class SendFragmentStatus extends FutureBitCommand<Ack> {
final FragmentStatus status;
public SendFragmentStatus(FragmentStatus status) {
@@ -114,12 +98,13 @@ public class BitTunnel {
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.sendFragmentStatus(status);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
}
+
}
- public class CancelFragment extends BitCommand<Ack> {
+ public static class CancelFragment extends FutureBitCommand<Ack> {
final FragmentHandle handle;
public CancelFragment(FragmentHandle handle) {
@@ -128,109 +113,23 @@ public class BitTunnel {
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.cancelFragment(handle);
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
}
}
- public class SendFragment extends BitCommand<Ack> {
+ public static class SendFragment extends ListeningBitCommand<Ack> {
final PlanFragment fragment;
- public SendFragment(PlanFragment fragment) {
- super();
+ public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) {
+ super(listener);
this.fragment = fragment;
}
@Override
- public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
- return connection.sendFragment(fragment);
- }
-
- }
-
-
-
-
- private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
-
- public void addLightListener(RpcOutcomeListener<T> outcomeListener){
- this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
- }
-
- public BitCommand() {
- super(SettableFuture.<T> create());
- }
-
- public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
-
- public final void run() {
-
- try {
-
- BitConnection connection = manager.getConnection(0);
- assert connection != null : "The connection manager should never return a null connection. Worse case, it should throw an exception.";
- CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
- rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
- } catch (RpcException ex) {
- ((SettableFuture<T>) delegate()).setException(ex);
- }
-
- }
-
- @Override
- protected RpcException mapException(Exception e) {
- Throwable t = e;
- if (e instanceof ExecutionException) {
- t = e.getCause();
- }
- if (t instanceof RpcException) return (RpcException) t;
- return new RpcException(t);
- }
-
- public class RpcOutcomeListenerWrapper implements Runnable{
- final RpcOutcomeListener<T> inner;
-
- public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
- this.inner = inner;
- }
-
- @Override
- public void run() {
- try{
- inner.success(BitCommand.this.checkedGet());
- }catch(RpcException e){
- inner.failed(e);
- }
- }
- }
-
- @Override
- public String toString() {
- return "BitCommand ["+this.getClass().getSimpleName()+"]";
- }
-
-
-
- }
-
- private class FutureBridge<T> implements Runnable {
- final SettableFuture<T> out;
- final CheckedFuture<T, RpcException> in;
-
- public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
- super();
- this.out = out;
- this.in = in;
- }
-
- @Override
- public void run() {
- try {
- out.set(in.checkedGet());
- } catch (RpcException ex) {
- out.setException(ex);
- }
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
new file mode 100644
index 0000000..8afbc33
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class ConnectionManagerRegistry implements Iterable<BitConnectionManager>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
+
+ private final ConcurrentMap<DrillbitEndpoint, BitConnectionManager> registry = Maps.newConcurrentMap();
+
+ private final BitComHandler handler;
+ private final BootStrapContext context;
+ private final ListenerPool listenerPool;
+ private volatile DrillbitEndpoint localEndpoint;
+
+ public ConnectionManagerRegistry(BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+ super();
+ this.handler = handler;
+ this.context = context;
+ this.listenerPool = listenerPool;
+ }
+
+ public BitConnectionManager getConnectionManager(DrillbitEndpoint endpoint){
+ assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
+ BitConnectionManager m = registry.get(endpoint);
+ if(m == null){
+ m = new BitConnectionManager(endpoint, localEndpoint, handler, context, listenerPool);
+ BitConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+ if(m2 != null) m = m2;
+ }
+
+ return m;
+ }
+
+ @Override
+ public Iterator<BitConnectionManager> iterator() {
+ return registry.values().iterator();
+ }
+
+ public void setEndpoint(DrillbitEndpoint endpoint){
+ this.localEndpoint = endpoint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
new file mode 100644
index 0000000..fa3b518
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class FutureBitCommand<T extends MessageLite> implements BitCommand<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
+
+ protected final SettableFuture<T> settableFuture;
+ private final RpcCheckedFuture<T> parentFuture;
+
+ public FutureBitCommand() {
+ this.settableFuture = SettableFuture.create();
+ this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+ }
+
+ public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+ @Override
+ public void connectionAvailable(BitConnection connection) {
+
+ doRpcCall(new DeferredRpcOutcome(), connection);
+ }
+
+ @Override
+ public void connectionSucceeded(BitConnection connection) {
+ connectionAvailable(connection);
+ }
+
+ private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+ @Override
+ public void failed(RpcException ex) {
+ settableFuture.setException(ex);
+ }
+
+ @Override
+ public void success(T value) {
+ settableFuture.set(value);
+ }
+
+ }
+
+ public DrillRpcFuture<T> getFuture() {
+ return parentFuture;
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ settableFuture.setException(RpcException.mapException(
+ String.format("Command failed while establishing connection. Failure type %s.", type), t));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
index 8f299d2..84dba85 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -22,32 +22,35 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.work.foreman.FragmentStatusListener;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
public class ListenerPool {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
- private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+ private final ConcurrentMap<QueryId, FragmentStatusListener> listeners;
public ListenerPool(int par){
- listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+ listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(16, 0.75f, par);
}
public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+ logger.debug("Removing framgent status listener for handle {}.", handle);
listeners.remove(handle);
}
public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
- FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+ logger.debug("Adding framgent status listener for handle {}.", handle);
+ FragmentStatusListener old = listeners.putIfAbsent(handle.getQueryId(), listener);
if(old != null) throw new RpcException("Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another.");
}
public void status(FragmentStatus status){
- FragmentStatusListener l = listeners.get(status.getHandle());
+ FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
if(l == null){
- logger.info("A fragment message arrived but there was no registered listener for that message.");
+
+ logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", status.getHandle());
return;
}else{
l.statusUpdate(status);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
new file mode 100644
index 0000000..90db6a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningBitCommand.class);
+
+ private final RpcOutcomeListener<T> listener;
+
+ public ListeningBitCommand(RpcOutcomeListener<T> listener) {
+ this.listener = listener;
+ }
+
+ public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+ @Override
+ public void connectionAvailable(BitConnection connection) {
+
+ doRpcCall(new DeferredRpcOutcome(), connection);
+ }
+
+ @Override
+ public void connectionSucceeded(BitConnection connection) {
+ connectionAvailable(connection);
+ }
+
+ private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+ @Override
+ public void failed(RpcException ex) {
+ listener.failed(ex);
+ }
+
+ @Override
+ public void success(T value) {
+ listener.success(value);
+ }
+
+ }
+
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ listener.failed(RpcException.mapException(
+ String.format("Command failed while establishing connection. Failure type %s.", type), t));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 3df88b7..779085c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -45,5 +45,12 @@ public class QueryResultBatch {
public boolean hasData(){
return data != null;
}
+
+ @Override
+ public String toString() {
+ return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
new file mode 100644
index 0000000..0aa7c86
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
+ * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
+ * query.
+ *
+ * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
+ *
+ */
+public class QueryResultHandler {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+ private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
+
+ public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){
+ return new SubmissionListener(listener);
+ }
+
+ public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
+ final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+ UserResultsListener l = resultsListener.get(result.getQueryId());
+ // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+ if (l != null) {
+ // logger.debug("Results listener available, using existing.");
+ l.resultArrived(batch);
+ if (result.getIsLastChunk()) {
+ resultsListener.remove(result.getQueryId(), l);
+ }
+ } else {
+ logger.debug("Results listener not available, creating a buffering listener.");
+ // manage race condition where we start getting results before we receive the queryid back.
+ BufferingListener bl = new BufferingListener();
+ l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+ if (l != null) {
+ l.resultArrived(batch);
+ } else {
+ bl.resultArrived(batch);
+ }
+ }
+ }
+
+ private class BufferingListener implements UserResultsListener {
+
+ private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private volatile UserResultsListener output;
+
+ public boolean transferTo(UserResultsListener l) {
+ synchronized (this) {
+ output = l;
+ boolean last = false;
+ for (QueryResultBatch r : results) {
+ l.resultArrived(r);
+ last = r.getHeader().getIsLastChunk();
+ }
+ return last;
+ }
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ synchronized (this) {
+ if (output == null) {
+ this.results.add(result);
+ } else {
+ output.resultArrived(result);
+ }
+ }
+ }
+
+ @Override
+ public void submissionFailed(RpcException ex) {
+ throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+ }
+
+ }
+
+ private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+ private UserResultsListener listener;
+
+ public SubmissionListener(UserResultsListener listener) {
+ super();
+ this.listener = listener;
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ listener.submissionFailed(ex);
+ }
+
+ @Override
+ public void success(QueryId queryId) {
+ logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener);
+ UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+ // we need to deal with the situation where we already received results by the time we got the query id back. In
+ // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+ // results during the transition
+ if (oldListener != null) {
+ logger.debug("Unable to place user results listener, buffering listener was already in place.");
+ if (oldListener instanceof BufferingListener) {
+ resultsListener.remove(oldListener);
+ boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+ // simply remove the buffering listener if we already have the last response.
+ if (all) {
+ resultsListener.remove(oldListener);
+ } else {
+ boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+ if (!replaced) throw new IllegalStateException();
+ }
+ } else {
+ throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+ }
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5d2e799..ad44ff2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -21,11 +21,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -36,115 +31,27 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.BasicClientWithConnection;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
import com.google.protobuf.MessageLite;
-public class UserClient extends BasicClientWithConnection<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
- private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+ private final QueryResultHandler queryResultHandler = new QueryResultHandler();
public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
- }
-
- public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
- this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
- return resultsListener.getFuture();
+ super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
}
- public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
- return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
- }
-
- private class BufferingListener extends UserResultsListener {
-
- private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private volatile UserResultsListener output;
-
- public boolean transferTo(UserResultsListener l) {
- lock.writeLock().lock();
- output = l;
- boolean last = false;
- for (QueryResultBatch r : results) {
- l.resultArrived(r);
- last = r.getHeader().getIsLastChunk();
- }
- if (future.isDone()) {
- l.set();
- }
- return last;
- }
-
- @Override
- public void resultArrived(QueryResultBatch result) {
- logger.debug("Result arrvied.");
- lock.readLock().lock();
- try {
- if (output == null) {
- this.results.add(result);
- } else {
- output.resultArrived(result);
- }
-
- } finally {
- lock.readLock().unlock();
- }
-
- }
-
- @Override
- public void submissionFailed(RpcException ex) {
- throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
- }
-
+ public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
+ send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
- private class SubmissionListener extends RpcOutcomeListener<QueryId> {
- private UserResultsListener listener;
-
- public SubmissionListener(UserResultsListener listener) {
- super();
- this.listener = listener;
- }
-
- @Override
- public void failed(RpcException ex) {
- listener.submissionFailed(ex);
- }
-
- @Override
- public void success(QueryId queryId) {
- logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener);
- UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
- // we need to deal with the situation where we already received results by the time we got the query id back. In
- // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
- // results during the transition
- if (oldListener != null) {
- logger.debug("Unable to place user results listener, buffering listener was already in place.");
- if (oldListener instanceof BufferingListener) {
- resultsListener.remove(oldListener);
- boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
- // simply remove the buffering listener if we already have the last response.
- if (all) {
- resultsListener.remove(oldListener);
- } else {
- boolean replaced = resultsListener.replace(queryId, oldListener, listener);
- if (!replaced) throw new IllegalStateException();
- }
- } else {
- throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
- }
- }
-
- }
-
+ public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
+ UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
+ this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort());
}
@Override
@@ -165,29 +72,7 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
switch (rpcType) {
case RpcType.QUERY_RESULT_VALUE:
- final QueryResult result = get(pBody, QueryResult.PARSER);
- final QueryResultBatch batch = new QueryResultBatch(result, dBody);
- UserResultsListener l = resultsListener.get(result.getQueryId());
-// logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
- if (l != null) {
-// logger.debug("Results listener available, using existing.");
- l.resultArrived(batch);
- if (result.getIsLastChunk()) {
- resultsListener.remove(result.getQueryId(), l);
- l.set();
- }
- } else {
- logger.debug("Results listener not available, creating a buffering listener.");
- // manage race condition where we start getting results before we receive the queryid back.
- BufferingListener bl = new BufferingListener();
- l = resultsListener.putIfAbsent(result.getQueryId(), bl);
- if (l != null) {
- l.resultArrived(batch);
- } else {
- bl.resultArrived(batch);
- }
- }
-
+ queryResultHandler.batchArrived(pBody, dBody);
return new Response(RpcType.ACK, Ack.getDefaultInstance());
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
@@ -196,18 +81,16 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
}
@Override
- protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
- return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+ protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+ logger.debug("Handling handshake from bit to user. {}", inbound);
+ if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(),
+ UserRpcConfig.RPC_VERSION));
- @Override
- protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
- logger.debug("Handling handshake from bit to user. {}", inbound);
- if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
- inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
- }
+ }
- };
+ @Override
+ protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3ce14f0..b1dbfe8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,17 +24,8 @@ import org.apache.drill.exec.rpc.RpcException;
import com.google.common.util.concurrent.SettableFuture;
-public abstract class UserResultsListener {
- SettableFuture<Void> future = SettableFuture.create();
+public interface UserResultsListener {
- final void set(){
- future.set(null);
- }
-
- Future<Void> getFuture(){
- return future;
- }
-
public abstract void submissionFailed(RpcException ex);
public abstract void resultArrived(QueryResultBatch result);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 406afc4..908af61 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -26,16 +26,15 @@ import io.netty.channel.EventLoopGroup;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.work.user.UserWorker;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -100,8 +99,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
super(channel);
}
- public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
- return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+ logger.debug("Sending result to client with {}", result);
+ send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
}
}
@@ -112,7 +112,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+ protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(UserClientConnection connection) {
return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3c4d9af..ed13748 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
public BootStrapContext(DrillConfig config) {
super();
this.config = config;
- this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+ this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
this.allocator = BufferAllocator.getAllocator(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 0337a68..199768f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.LocalCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.LocalClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
public class RemoteServiceSet implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -37,6 +38,7 @@ public class RemoteServiceSet implements Closeable{
this.coordinator = coordinator;
}
+
public DistributedCache getCache() {
return cache;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d6d3b9c..b07f274 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -53,12 +53,12 @@ public class ServiceEngine implements Closeable{
public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
- int bitPort = bitCom.start();
- return DrillbitEndpoint.newBuilder()
+ DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
.setAddress(InetAddress.getLocalHost().getHostAddress())
- .setBitPort(bitPort)
.setUserPort(userPort)
.build();
+
+ return bitCom.start(partialEndpoint);
}
public BitCom getBitCom(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
index f6a9786..9a72845 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -18,10 +18,9 @@
package org.apache.drill.exec.work;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+public abstract class EndpointListener<RET, V> extends BaseRpcOutcomeListener<RET>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
protected final DrillbitEndpoint endpoint;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
index 2900d99..554b398 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -65,6 +65,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
@Override
public void run() {
+ logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
return;
@@ -76,7 +77,12 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
try{
while(state.get() == FragmentState.RUNNING_VALUE){
if(!root.next()){
- updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ if(context.isFailed()){
+ updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+ }else{
+ updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ }
+
}
}
@@ -90,7 +96,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
}finally{
t.stop();
}
-
+ logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
}
private void internalFail(Throwable excep){
[3/3] git commit: Clean up threading of client/server. Utilize
command pattern for BitCom stuff to abstract away connection failures. Works
on one bit single exchange remote query now. Next up,
two bit single exchange query.
Posted by ja...@apache.org.
Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8db98ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8db98ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8db98ad
Branch: refs/heads/execwork
Commit: b8db98ad7c159db3cf41a3866ff53013f87964b4
Parents: e57a8d6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue May 21 18:38:56 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 21 18:38:56 2013 -0700
----------------------------------------------------------------------
.../drill/common/graph/AdjacencyListBuilder.java | 2 +-
.../org/apache/drill/exec/cache/LocalCache.java | 2 +
.../org/apache/drill/exec/client/DrillClient.java | 77 ++++--
.../drill/exec/coord/LocalClusterCoordinator.java | 7 +-
.../org/apache/drill/exec/ops/FragmentContext.java | 19 ++-
.../org/apache/drill/exec/ops/QueryContext.java | 4 +
.../exec/physical/config/MockRecordReader.java | 1 -
.../drill/exec/physical/config/RandomReceiver.java | 5 -
.../apache/drill/exec/physical/config/Screen.java | 2 +-
.../drill/exec/physical/impl/ScreenCreator.java | 79 +++++-
.../exec/physical/impl/SingleSenderCreator.java | 41 +++-
.../drill/exec/physical/impl/WireRecordBatch.java | 8 +-
.../impl/materialize/QueryWritableBatch.java | 8 +
.../impl/materialize/VectorRecordMaterializer.java | 11 +-
.../drill/exec/planner/fragment/Materializer.java | 8 +-
.../exec/planner/fragment/SimpleParallelizer.java | 4 +-
.../exec/planner/fragment/StatsCollector.java | 2 +-
.../apache/drill/exec/record/RawFragmentBatch.java | 5 +
.../drill/exec/rpc/AbstractHandshakeHandler.java | 5 +-
.../drill/exec/rpc/BaseRpcOutcomeListener.java | 32 +++
.../org/apache/drill/exec/rpc/BasicClient.java | 176 ++++++++------
.../drill/exec/rpc/BasicClientWithConnection.java | 9 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 7 +-
.../rpc/ChannelListenerWithCoordinationId.java | 25 ++
.../apache/drill/exec/rpc/CoordinationQueue.java | 96 +++++++--
.../org/apache/drill/exec/rpc/DrillRpcFuture.java | 2 -
.../apache/drill/exec/rpc/DrillRpcFutureImpl.java | 70 +-----
.../java/org/apache/drill/exec/rpc/RpcBus.java | 88 ++++---
.../apache/drill/exec/rpc/RpcCheckedFuture.java | 33 +++
.../drill/exec/rpc/RpcConnectionHandler.java | 28 +++
.../org/apache/drill/exec/rpc/RpcException.java | 13 +
.../java/org/apache/drill/exec/rpc/RpcOutcome.java | 26 ++
.../apache/drill/exec/rpc/RpcOutcomeListener.java | 7 +-
.../exec/rpc/ZeroCopyProtobufLengthDecoder.java | 2 +-
.../org/apache/drill/exec/rpc/bit/BitClient.java | 52 ++---
.../java/org/apache/drill/exec/rpc/bit/BitCom.java | 8 +-
.../org/apache/drill/exec/rpc/bit/BitComImpl.java | 129 ++---------
.../org/apache/drill/exec/rpc/bit/BitCommand.java | 28 +++
.../apache/drill/exec/rpc/bit/BitConnection.java | 79 +-----
.../drill/exec/rpc/bit/BitConnectionManager.java | 175 +++++++++++---
.../org/apache/drill/exec/rpc/bit/BitServer.java | 60 ++++-
.../org/apache/drill/exec/rpc/bit/BitTunnel.java | 187 ++++-----------
.../exec/rpc/bit/ConnectionManagerRegistry.java | 73 ++++++
.../drill/exec/rpc/bit/FutureBitCommand.java | 78 ++++++
.../apache/drill/exec/rpc/bit/ListenerPool.java | 15 +-
.../drill/exec/rpc/bit/ListeningBitCommand.java | 73 ++++++
.../drill/exec/rpc/user/QueryResultBatch.java | 7 +
.../drill/exec/rpc/user/QueryResultHandler.java | 153 ++++++++++++
.../org/apache/drill/exec/rpc/user/UserClient.java | 153 ++-----------
.../drill/exec/rpc/user/UserResultsListener.java | 11 +-
.../org/apache/drill/exec/rpc/user/UserServer.java | 10 +-
.../apache/drill/exec/server/BootStrapContext.java | 2 +-
.../apache/drill/exec/server/RemoteServiceSet.java | 2 +
.../apache/drill/exec/service/ServiceEngine.java | 6 +-
.../apache/drill/exec/work/EndpointListener.java | 5 +-
.../org/apache/drill/exec/work/FragmentRunner.java | 10 +-
.../org/apache/drill/exec/work/WorkManager.java | 7 +-
.../exec/work/batch/AbstractFragmentCollector.java | 7 +-
.../drill/exec/work/batch/BatchCollector.java | 3 +-
.../drill/exec/work/batch/BitComHandlerImpl.java | 9 +-
.../drill/exec/work/batch/IncomingBuffers.java | 21 +-
.../drill/exec/work/batch/MergingCollector.java | 5 +-
.../exec/work/batch/PartitionedCollector.java | 1 +
.../exec/work/batch/UnlmitedRawBatchBuffer.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 22 +-
.../exec/work/foreman/RunningFragmentManager.java | 8 +-
.../work/fragment/IncomingFragmentHandler.java | 2 +-
.../exec/work/fragment/LocalFragmentHandler.java | 4 +-
.../exec/work/fragment/RemoteFragmentHandler.java | 4 +-
.../exec/physical/impl/DistributedFragmentRun.java | 17 +-
.../org/apache/drill/exec/pop/CheckFragmenter.java | 21 +--
.../org/apache/drill/exec/pop/FragmentChecker.java | 41 +++-
.../org/apache/drill/exec/server/TestBitRpc.java | 26 ++-
.../test/resources/physical_single_exchange.json | 1 -
74 files changed, 1498 insertions(+), 923 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
index 1668477..4a385ce 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
@@ -57,7 +57,7 @@ import java.util.Map;
}
public AdjacencyList<V> getAdjacencyList() {
- logger.debug("Values; {}", ops.values().toArray());
+// logger.debug("Values; {}", ops.values().toArray());
AdjacencyList<V> a = new AdjacencyList<V>();
for (AdjacencyList<V>.Node from : ops.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index ddb2a02..b656f2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -43,11 +43,13 @@ public class LocalCache implements DistributedCache {
@Override
public PlanFragment getFragment(FragmentHandle handle) {
+ logger.debug("looking for fragment with handle: {}", handle);
return handles.get(handle);
}
@Override
public void storeFragment(PlanFragment fragment) {
+ logger.debug("Storing fragment: {}", fragment);
handles.put(fragment.getHandle(), fragment);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bb7f77e..c35e834 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -30,22 +30,23 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.proto.UserProtos.RpcType;
-import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.rpc.user.UserRpcConfig;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
@@ -75,9 +76,6 @@ public class DrillClient implements Closeable{
}
-
-
-
/**
* Connects the client to a Drillbit server
*
@@ -97,7 +95,9 @@ public class DrillClient implements Closeable{
this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
try {
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
- this.client.connect(endpoint);
+ FutureHandler f = new FutureHandler();
+ this.client.connect(f, endpoint);
+ f.checkedGet();
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -120,34 +120,63 @@ public class DrillClient implements Closeable{
* @throws RpcException
*/
public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
- try {
- ListHoldingResultsListener listener = new ListHoldingResultsListener();
- Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
- f.get();
- if(listener.ex != null){
- throw listener.ex;
- }else{
- return listener.results;
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new RpcException(e);
- }
+ ListHoldingResultsListener listener = new ListHoldingResultsListener();
+ client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
+ return listener.getResults();
+
}
- private class ListHoldingResultsListener extends UserResultsListener{
- private RpcException ex;
+ private class ListHoldingResultsListener implements UserResultsListener {
private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+ private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
@Override
public void submissionFailed(RpcException ex) {
logger.debug("Submission failed.", ex);
- this.ex = ex;
+ future.setException(ex);
}
@Override
public void resultArrived(QueryResultBatch result) {
logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
+ if(result.getHeader().getIsLastChunk()){
+ future.set(results);
+ }
+ }
+
+ public List<QueryResultBatch> getResults() throws RpcException{
+ try{
+ return future.get();
+ }catch(Throwable t){
+ throw RpcException.mapException(t);
+ }
+ }
+ }
+
+ private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{
+
+ protected FutureHandler() {
+ super( SettableFuture.<Void>create());
+ }
+
+ @Override
+ public void connectionSucceeded(ServerConnection connection) {
+ getInner().set(null);
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ getInner().setException(new RpcException(String.format("Failure connecting to server. Failure of type %s.", type.name()), t));
+ }
+
+ private SettableFuture<Void> getInner(){
+ return (SettableFuture<Void>) delegate();
+ }
+
+ @Override
+ protected RpcException mapException(Exception e) {
+ return RpcException.mapException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
index 43a5430..f7b3549 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -29,17 +29,16 @@ import com.google.common.collect.Maps;
public class LocalClusterCoordinator extends ClusterCoordinator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
- private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+ private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
@Override
public void close() throws IOException {
- endpoints = null;
+ endpoints.clear();
}
@Override
public void start(long millis) throws Exception {
logger.debug("Local Cluster Coordinator started.");
- endpoints = Maps.newConcurrentMap();
}
@Override
@@ -52,6 +51,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator{
@Override
public void unregister(RegistrationHandle handle) {
+ if(handle == null) return;
+
endpoints.remove(handle);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e64453c..33707a0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -52,7 +52,9 @@ public class FragmentContext {
private final FragmentHandle handle;
private final UserClientConnection connection;
private final IncomingBuffers buffers;
-
+ private volatile Throwable failureCause;
+ private volatile boolean failed = false;
+
public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
@@ -65,9 +67,10 @@ public class FragmentContext {
}
public void fail(Throwable cause) {
-
+ logger.debug("Fragment Context received failure. {}", cause);
+ failed = true;
+ failureCause = cause;
}
-
public DrillbitContext getDrillbitContext(){
return context;
@@ -107,4 +110,14 @@ public class FragmentContext {
public IncomingBuffers getBuffers(){
return buffers;
}
+
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ public boolean isFailed(){
+ return failed;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fd24deb..1c251b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.server.DrillbitContext;
public class QueryContext {
@@ -57,4 +58,7 @@ public class QueryContext {
return drillbitContext.getPlanReader();
}
+ public BitCom getBitCom(){
+ return drillbitContext.getBitCom();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index eaaeaa3..6a1eba4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -76,7 +76,6 @@ public class MockRecordReader implements RecordReader {
int batchRecordCount = 250000 / estimateRowSize;
for (int i = 0; i < config.getTypes().length; i++) {
- logger.debug("Adding field {} of type {}", i, config.getTypes()[i]);
valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
output.addField(i, valueVectors[i]);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
index ed41586..6772fb0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -72,11 +72,6 @@ public class RandomReceiver extends AbstractReceiver{
return new Size(1,1);
}
- @Override
- public int getOppositeMajorFragmentId() {
- return 0;
- }
-
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 86a201d..688c6b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -64,7 +64,7 @@ public class Screen extends AbstractStore implements Root{
// didn't get screwed up.
if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
DrillbitEndpoint endpoint = endpoints.iterator().next();
- logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+// logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
if (!endpoint.equals(this.endpoint)) {
throw new PhysicalOperatorSetupException(String.format(
"A Screen operator can only be assigned to its home node. Expected endpoint %s, Actual endpoint: %s",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c0711db..c20538d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,21 +17,32 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import io.netty.buffer.ByteBuf;
+
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
import com.google.common.base.Preconditions;
public class ScreenCreator implements RootCreator<Screen>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
-
+
+
@Override
public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
Preconditions.checkArgument(children.size() == 1);
@@ -40,7 +51,9 @@ public class ScreenCreator implements RootCreator<Screen>{
private static class ScreenRoot implements RootExec{
-
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+ volatile boolean ok = true;
+
final RecordBatch incoming;
final FragmentContext context;
final UserClientConnection connection;
@@ -56,25 +69,53 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public boolean next() {
+ if(!ok){
+ stop();
+ return false;
+ }
+
IterOutcome outcome = incoming.next();
- boolean isLast = false;
+ logger.debug("Screen Outcome {}", outcome);
switch(outcome){
- case NONE:
- case STOP:
- connection.sendResult(materializer.convertNext(true));
- context.batchesCompleted.inc(1);
- context.recordsCompleted.inc(incoming.getRecordCount());
+ case STOP: {
+ QueryResult header1 = QueryResult.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+
+ connection.sendResult(listener, batch1);
+ return false;
+ }
+ case NONE: {
+ if(materializer == null){
+ // receive no results.
+ context.batchesCompleted.inc(1);
+ context.recordsCompleted.inc(incoming.getRecordCount());
+ QueryResult header2 = QueryResult.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch batch2 = new QueryWritableBatch(header2);
+ connection.sendResult(listener, batch2);
+ }else{
+ connection.sendResult(listener, materializer.convertNext(true));
+ }
return false;
-
+ }
case OK_NEW_SCHEMA:
materializer = new VectorRecordMaterializer(context, incoming);
// fall through.
- // fall through
case OK:
- connection.sendResult(materializer.convertNext(false));
context.batchesCompleted.inc(1);
context.recordsCompleted.inc(incoming.getRecordCount());
- return !isLast;
+ connection.sendResult(listener, materializer.convertNext(false));
+ return true;
default:
throw new UnsupportedOperationException();
}
@@ -85,6 +126,20 @@ public class ScreenCreator implements RootCreator<Screen>{
incoming.kill();
}
+ private SendListener listener = new SendListener();
+
+ private class SendListener extends BaseRpcOutcomeListener<Ack>{
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.error("Failure while sending data to user.", ex);
+ ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+ ok = false;
+ }
+
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 60c2d78..b7d4c7e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -23,9 +23,12 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.bit.BitTunnel;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -45,9 +48,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
private FragmentHandle handle;
private int recMajor;
private FragmentContext context;
+ private volatile boolean ok = true;
public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
- logger.debug("Creating single sender root exec base on config: {}", config);
this.incoming = batch;
this.handle = context.getHandle();
this.recMajor = config.getOppositeMajorFragmentId();
@@ -57,20 +60,24 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public boolean next() {
+ if(!ok){
+ incoming.kill();
+
+ return false;
+ }
IterOutcome out = incoming.next();
logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
- FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
- tunnel.sendRecordBatch(context, b2);
+ FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
return false;
-
case OK:
case OK_NEW_SCHEMA:
- FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
- tunnel.sendRecordBatch(context, batch);
+ FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
return true;
case NOT_YET:
@@ -81,9 +88,31 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void stop() {
+ ok = false;
}
+ private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
+
+ @Override
+ public void failed(RpcException ex) {
+ context.fail(ex);
+ stop();
+ }
+
+ @Override
+ public void success(Ack value) {
+ if(value.getOk()) return;
+
+ logger.error("Downstream fragment was not accepted. Stopping future sends.");
+ // if we didn't get ack ok, we'll need to kill the query.
+ context.fail(new RpcException("A downstream fragment batch wasn't accepted. This fragment thus fails."));
+ stop();
+ }
+
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fc7f833..b41b0cd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -38,6 +38,7 @@ public class WireRecordBatch implements RecordBatch{
private RecordBatchLoader batchLoader;
private RawFragmentBatchProvider fragProvider;
private FragmentContext context;
+ private BatchSchema schema;
public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
@@ -53,7 +54,7 @@ public class WireRecordBatch implements RecordBatch{
@Override
public BatchSchema getSchema() {
- return null;
+ return schema;
}
@Override
@@ -73,13 +74,16 @@ public class WireRecordBatch implements RecordBatch{
@Override
public IterOutcome next() {
- RawFragmentBatch batch = this.fragProvider.getNext();
+ RawFragmentBatch batch = fragProvider.getNext();
try{
if(batch == null) return IterOutcome.NONE;
+ logger.debug("Next received batch {}", batch);
+
RecordBatchDef rbd = batch.getHeader().getDef();
boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
if(schemaChanged){
+ this.schema = batchLoader.getSchema();
return IterOutcome.OK_NEW_SCHEMA;
}else{
return IterOutcome.OK;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 187e6e9..e8ed48a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.materialize;
+import java.util.Arrays;
+
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
@@ -42,5 +44,11 @@ public class QueryWritableBatch {
public QueryResult getHeader() {
return header;
}
+
+ @Override
+ public String toString() {
+ return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index e2d2eb9..7929296 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.materialize;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
@@ -33,10 +34,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
this.queryId = context.getHandle().getQueryId();
this.batch = batch;
-
- for (MaterializedField f : batch.getSchema()) {
- logger.debug("New Field: {}", f);
- }
+ BatchSchema schema = batch.getSchema();
+ assert schema != null : "Schema must be defined.";
+
+// for (MaterializedField f : batch.getSchema()) {
+// logger.debug("New Field: {}", f);
+// }
}
public QueryWritableBatch convertNext(boolean isLast) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 9fee586..da71271 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -41,13 +41,13 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
// this is a sending exchange.
PhysicalOperator child = exchange.getChild().accept(this, iNode);
PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child);
- logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
+// logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
return materializedSender;
}else{
// receiving exchange.
PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
- logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
+// logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
return materializedReceiver;
}
}
@@ -63,7 +63,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
try {
PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
- logger.debug("New materialized store node {} with child {}", o, child);
+// logger.debug("New materialized store node {} with child {}", o, child);
return o;
} catch (PhysicalOperatorSetupException e) {
throw new FragmentSetupException("Failure while generating a specific Store materialization.");
@@ -72,7 +72,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
- logger.debug("Visiting catch all: {}", op);
+// logger.debug("Visiting catch all: {}", op);
List<PhysicalOperator> children = Lists.newArrayList();
for(PhysicalOperator child : op){
children.add(child.accept(this, iNode));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index fc03a23..8adb447 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -145,7 +145,7 @@ public class SimpleParallelizer {
// figure out width.
int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
float diskCost = stats.getDiskCost();
- logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+// logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
// TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
// of tasks or the maximum width of the fragment.
@@ -154,7 +154,7 @@ public class SimpleParallelizer {
}
if (width < 1) width = 1;
- logger.debug("Setting width {} on fragment {}", width, wrapper);
+// logger.debug("Setting width {} on fragment {}", width, wrapper);
wrapper.setWidth(width);
// figure out endpoint assignments. also informs the exchanges about their respective endpoints.
wrapper.assignEndpoints(allNodes);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index d53a78c..af8ec04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -41,7 +41,7 @@ public class StatsCollector {
Wrapper wrapper = planningSet.get(n);
n.getRoot().accept(opStatCollector, wrapper);
- logger.debug("Set stats to {}", wrapper.getStats());
+// logger.debug("Set stats to {}", wrapper.getStats());
// receivers...
for (ExchangeFragmentPair child : n) {
// get the fragment node that feeds this node.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index c244cea..4f87224 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -41,4 +41,9 @@ public class RawFragmentBatch {
return body;
}
+ @Override
+ public String toString() {
+ return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 859d385..ea591da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -25,8 +25,7 @@ import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
- ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
protected final EnumLite handshakeType;
@@ -41,7 +40,7 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
@Override
public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
- coordinationId = inbound.coordinationId;
+ this.coordinationId = inbound.coordinationId;
ctx.channel().pipeline().remove(this);
if (inbound.rpcType != handshakeType.getNumber())
throw new RpcException(String.format("Handshake failure. Expected %s[%d] but received number [%d]",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
new file mode 100644
index 0000000..1dab1c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
+
+ @Override
+ public void failed(RpcException ex) {
+ }
+
+ @Override
+ public void success(T value) {
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0ff2b9d..0afc5d0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -29,22 +29,30 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
+
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
+ extends RpcBus<T, R> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
- private Bootstrap b;
+ private final Bootstrap b;
private volatile boolean connect = false;
protected R connection;
- private EventLoopGroup eventLoop;
+ private final T handshakeType;
+ private final Class<HANDSHAKE_RESPONSE> responseClass;
+ private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
- public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
super(rpcMapping);
- this.eventLoop = eventLoopGroup;
+ this.responseClass = responseClass;
+ this.handshakeType = handshakeType;
+ this.handshakeParser = handshakeParser;
b = new Bootstrap() //
.group(eventLoopGroup) //
@@ -59,12 +67,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
logger.debug("initializing client connection.");
connection = initRemoteConnection(ch);
ch.closeFuture().addListener(getCloseHandler(connection));
-
+
ch.pipeline().addLast( //
new ZeroCopyProtobufLengthDecoder(), //
new RpcDecoder(rpcConfig.getName()), //
new RpcEncoder(rpcConfig.getName()), //
- getHandshakeHandler(), //
+ new ClientHandshakeHandler(), //
new InboundHandler(connection), //
new RpcExceptionHandler() //
);
@@ -75,26 +83,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
;
}
- protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
-
- protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
- private Class<T> responseType;
-
- public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
- super(handshakeType, parser);
- this.responseType = responseType;
- }
-
- @Override
- protected final void consumeHandshake(Channel c, T msg) throws Exception {
- validateHandshake(msg);
- queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
- }
-
- protected abstract void validateHandshake(T msg) throws Exception;
-
- }
-
+ protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
+ protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
+
protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
return new ChannelClosedHandler();
}
@@ -105,6 +96,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
"This shouldn't be used in client mode as a client only has a single connection.");
}
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
+ T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
+ }
+
protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
@@ -115,65 +111,91 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
return true;
}
- /**
- * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom. Should be cleaned up.
- */
- private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
- final SettableFuture<RECEIVE> future;
- T handshakeType;
- SEND handshakeValue;
- String host;
- int port;
- Class<RECEIVE> responseClass;
-
- public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
- super();
- assert host != null && !host.isEmpty();
- assert port > 0;
- logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
- this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
- future = SettableFuture.create();
- this.handshakeType = handshakeType;
+ protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+ ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
+ b.connect(host, port).addListener(cml.connectionHandler);
+ }
+
+ private class ConnectionMultiListener {
+ private final RpcConnectionHandler<R> l;
+ private final HANDSHAKE_SEND handshakeValue;
+
+ public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
+ assert l != null;
+ assert handshakeValue != null;
+
+ this.l = l;
this.handshakeValue = handshakeValue;
- this.host = host;
- this.port = port;
- this.responseClass = responseClass;
}
- @Override
- public void run() {
- try {
- logger.debug("Starting to get client connection on host {}, port {}.", host, port);
-
- ChannelFuture f = b.connect(host, port);
- f.sync();
- if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
- connect = !connect;
- logger.debug("Client connected, sending handshake.");
- DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
- future.set(fut.checkedGet());
- logger.debug("Got bit client connection.");
- } catch (Exception e) {
- logger.debug("Failed to get client connection.", e);
- future.setException(e);
+ public final ConnectionHandler connectionHandler = new ConnectionHandler();
+ public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
+
+ /**
+ * Manages connection establishment outcomes.
+ */
+ private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
+ try {
+ future.get();
+ if (future.isSuccess()) {
+ send(handshakeSendHandler, handshakeType, handshakeValue, responseClass);
+ } else {
+ l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ }
+// logger.debug("Handshake queued for send.");
+ } catch (Exception ex) {
+ l.connectionFailed(FailureType.CONNECTION, ex);
+ }
}
}
+ /**
+ * manages handshake outcomes.
+ */
+ private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.debug("Failure while initiating handshake", ex);
+ l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
+
+ @Override
+ public void success(HANDSHAKE_RESPONSE value) {
+// logger.debug("Handshake received. {}", value);
+ try {
+ BasicClient.this.validateHandshake(value);
+ BasicClient.this.finalizeConnection(value, connection);
+ BasicClient.this.connect = true;
+ l.connectionSucceeded(connection);
+// logger.debug("Handshake completed succesfully.");
+ } catch (RpcException ex) {
+ l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
+ }
+ }
+
+ }
+
}
- protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
- SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
- RpcException {
-
-
- HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
- ht.start();
- try{
- return ht.future.get();
- }catch(Exception e){
- throw new RpcException(e);
+ private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
+
+ public ClientHandshakeHandler() {
+ super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
}
-
+
+ @Override
+ protected final void consumeHandshake(Channel c, HANDSHAKE_RESPONSE msg) throws Exception {
+ // remove the handshake information from the queue so it doesn't sit there forever.
+ RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
+ responseClass);
+ response.set(msg);
+ }
+
}
public void close() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 0e62f14..2028db6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -26,13 +26,16 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
import com.google.protobuf.Internal.EnumLite;
-public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
- public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
- super(rpcMapping, alloc, eventLoopGroup);
+ public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+ Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+ super(rpcMapping, alloc, eventLoopGroup, handshakeType, responseClass, handshakeParser);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 52bb0a2..af5d9c9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -72,7 +72,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
new ZeroCopyProtobufLengthDecoder(), //
new RpcDecoder(rpcConfig.getName()), //
new RpcEncoder(rpcConfig.getName()), //
- getHandshakeHandler(),
+ getHandshakeHandler(connection),
new InboundHandler(connection), //
new RpcExceptionHandler() //
);
@@ -88,7 +88,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
}
- protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
+ protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
@@ -104,9 +104,6 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
-
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
new file mode 100644
index 0000000..27e9dee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public interface ChannelListenerWithCoordinationId extends GenericFutureListener<ChannelFuture>{
+ public int getCoordinationId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 70142bb..9edbe11 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -17,8 +17,12 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
@@ -29,31 +33,93 @@ public class CoordinationQueue {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
- private final Map<Integer, DrillRpcFutureImpl<?>> map;
+ private final Map<Integer, RpcOutcome<?>> map;
public CoordinationQueue(int segmentSize, int segmentCount) {
- map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
+ map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
}
- void channelClosed(Exception ex) {
- for (DrillRpcFutureImpl<?> f : map.values()) {
- f.setException(ex);
+ void channelClosed(Throwable ex) {
+ if(ex != null){
+ RpcException e;
+ if(ex instanceof RpcException){
+ e = (RpcException) ex;
+ }else{
+ e = new RpcException(ex);
+ }
+ for (RpcOutcome<?> f : map.values()) {
+ f.setException(e);
+ }
}
}
- public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+ public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
int i = circularInt.getNext();
- DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
- // logger.debug("Writing to map coord {}, future {}", i, future);
+ RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
Object old = map.put(i, future);
if (old != null)
throw new IllegalStateException(
"You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak.");
return future;
}
+
+ private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
+ final RpcOutcomeListener<T> handler;
+ final Class<T> clazz;
+ final int coordinationId;
+
+ public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+ super();
+ this.handler = handler;
+ this.clazz = clazz;
+ this.coordinationId = coordinationId;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(!future.isSuccess()){
+ removeFromMap(coordinationId);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void set(Object value) {
+ assert clazz.isAssignableFrom(value.getClass());
+ handler.success( (T) value);
+ }
+
+ @Override
+ public void setException(Throwable t) {
+ handler.failed(RpcException.mapException(t));
+ }
+
+ @Override
+ public Class<T> getOutcomeType() {
+ return clazz;
+ }
+
+ @Override
+ public int getCoordinationId() {
+ return coordinationId;
+ }
+
+
+ }
+//
+// public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+// int i = circularInt.getNext();
+// DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
+// // logger.debug("Writing to map coord {}, future {}", i, future);
+// Object old = map.put(i, future);
+// if (old != null)
+// throw new IllegalStateException(
+// "You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak.");
+// return future;
+// }
- private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
- DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
+ private RpcOutcome<?> removeFromMap(int coordinationId) {
+ RpcOutcome<?> rpc = map.remove(coordinationId);
if (rpc == null) {
logger.error("Rpc is null.");
throw new IllegalStateException(
@@ -62,11 +128,11 @@ public class CoordinationQueue {
return rpc;
}
- public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
+ public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
// logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
- DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+ RpcOutcome<?> rpc = removeFromMap(coordinationId);
// logger.debug("Got rpc from map {}", rpc);
- Class<?> outcomeClass = rpc.getOutcomeClass();
+ Class<?> outcomeClass = rpc.getOutcomeType();
if (outcomeClass != clazz) {
@@ -80,7 +146,7 @@ public class CoordinationQueue {
}
@SuppressWarnings("unchecked")
- DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
+ RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
// logger.debug("Returning casted future");
return crpc;
@@ -88,7 +154,7 @@ public class CoordinationQueue {
public void updateFailedFuture(int coordinationId, RpcFailure failure) {
// logger.debug("Updating failed future.");
- DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+ RpcOutcome<?> rpc = removeFromMap(coordinationId);
rpc.setException(new RemoteRpcException(failure));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index bae947a..9033ea1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -21,6 +21,4 @@ import com.google.common.util.concurrent.CheckedFuture;
public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
-
- public void addLightListener(RpcOutcomeListener<T> outcomeListener);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index ee14eeb..d5d3a9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -22,22 +22,12 @@ import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.AbstractCheckedFuture;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
- final int coordinationId;
- private final Class<V> clazz;
-
- public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
- super(delegate);
- this.coordinationId = coordinationId;
- this.clazz = clazz;
- }
-
- public Class<V> getOutcomeClass(){
- return clazz;
+ public DrillRpcFutureImpl() {
+ super(new InnerFuture<V>());
}
/**
@@ -53,24 +43,7 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
@Override
protected RpcException mapException(Exception ex) {
- Throwable e = ex;
- while(e instanceof ExecutionException){
- e = e.getCause();
- }
- if (e instanceof RpcException) return (RpcException) e;
-
- return new RpcException(ex);
-
- }
-
- @SuppressWarnings("unchecked")
- void setValue(Object value) {
- assert clazz.isAssignableFrom(value.getClass());
- ((InnerFuture<V>) super.delegate()).setValue((V) value);
- }
-
- boolean setException(Throwable t) {
- return ((InnerFuture<V>) super.delegate()).setException(t);
+ return RpcException.mapException(ex);
}
public static class InnerFuture<T> extends AbstractFuture<T> {
@@ -85,34 +58,17 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
}
}
- public class RpcOutcomeListenerWrapper implements Runnable{
- final RpcOutcomeListener<V> inner;
-
- public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
- super();
- this.inner = inner;
- }
-
- @Override
- public void run() {
- try{
- inner.success(DrillRpcFutureImpl.this.checkedGet());
- }catch(RpcException e){
- inner.failed(e);
- }
- }
- }
-
- public void addLightListener(RpcOutcomeListener<V> outcomeListener){
- this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+ @Override
+ public void failed(RpcException ex) {
+ ( (InnerFuture<V>)delegate()).setException(ex);
}
-
-
-
- public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
- InnerFuture<V> f = new InnerFuture<V>();
- return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+
+ @Override
+ public void success(V value) {
+ ( (InnerFuture<V>)delegate()).setValue(value);
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 11764db..a680a97 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -64,6 +64,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+ DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
+ this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
+ return rpcFuture;
+ }
+
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+
+
+
assert !Arrays.asList(dataBodies).contains(null);
assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
@@ -72,14 +82,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
boolean completed = false;
try {
- // logger.debug("Seding message");
Preconditions.checkNotNull(protobufBody);
- DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
- OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+ ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+ OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
ChannelFuture channelFuture = connection.getChannel().write(m);
- channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
+ channelFuture.addListener(futureListener);
completed = true;
- return rpcFuture;
} finally {
if (!completed) {
if (pBuffer != null) pBuffer.release();
@@ -140,10 +148,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
case RESPONSE:
MessageLite m = getResponseDefaultInstance(msg.rpcType);
assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+ RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
Parser<?> parser = m.getParserForType();
Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- rpcFuture.setValue(value);
+ rpcFuture.set(value);
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
break;
@@ -162,39 +170,39 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
}
- private class Listener implements GenericFutureListener<ChannelFuture> {
-
- private int coordinationId;
- private Class<?> clazz;
-
- public Listener(int coordinationId, Class<?> clazz) {
- this.coordinationId = coordinationId;
- this.clazz = clazz;
- }
-
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- // logger.debug("Completed channel write.");
-
- if (channelFuture.isCancelled()) {
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
- rpcFuture.setException(new CancellationException("Socket operation was canceled."));
- } else if (!channelFuture.isSuccess()) {
- try {
- channelFuture.get();
- throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
- } catch (Exception e) {
- logger.error("Error occurred during Rpc", e);
- DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
- rpcFuture.setException(e);
- }
- } else {
- // send was successful. No need to modify DrillRpcFuture.
- return;
- }
- }
-
- }
+// private class Listener implements GenericFutureListener<ChannelFuture> {
+//
+// private int coordinationId;
+// private Class<?> clazz;
+//
+// public Listener(int coordinationId, Class<?> clazz) {
+// this.coordinationId = coordinationId;
+// this.clazz = clazz;
+// }
+//
+// @Override
+// public void operationComplete(ChannelFuture channelFuture) throws Exception {
+// // logger.debug("Completed channel write.");
+//
+// if (channelFuture.isCancelled()) {
+// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+// rpcFuture.setException(new CancellationException("Socket operation was canceled."));
+// } else if (!channelFuture.isSuccess()) {
+// try {
+// channelFuture.get();
+// throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
+// } catch (Exception e) {
+// logger.error("Error occurred during Rpc", e);
+// RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+// rpcFuture.setException(e);
+// }
+// } else {
+// // send was successful. No need to modify DrillRpcFuture.
+// return;
+// }
+// }
+//
+// }
public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
new file mode 100644
index 0000000..7c300d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+ public RpcCheckedFuture(ListenableFuture<T> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ protected RpcException mapException(Exception e) {
+ return RpcException.mapException(e);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
new file mode 100644
index 0000000..0f55488
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcConnectionHandler<T extends RemoteConnection> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
+
+ public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+
+ public void connectionSucceeded(T connection);
+ public void connectionFailed(FailureType type, Throwable t);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index ca66481..500f959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
+import java.util.concurrent.ExecutionException;
+
import org.apache.drill.common.exceptions.DrillIOException;
/**
@@ -41,5 +43,16 @@ public class RpcException extends DrillIOException{
super(cause);
}
+ public static RpcException mapException(Throwable t){
+ while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+ if(t instanceof RpcException) return ((RpcException) t);
+ return new RpcException(t);
+ }
+
+ public static RpcException mapException(String message, Throwable t){
+ while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+ return new RpcException(message, t);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
new file mode 100644
index 0000000..a25e5e7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcOutcome<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
+
+ public void set(Object value);
+ public void setException(Throwable t);
+ public Class<T> getOutcomeType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index fac908c..771edcf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -17,11 +17,10 @@
******************************************************************************/
package org.apache.drill.exec.rpc;
-public abstract class RpcOutcomeListener<V> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+public interface RpcOutcomeListener<V> {
- public void failed(RpcException ex){};
- public void success(V value){};
+ public void failed(RpcException ex);
+ public void success(V value);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 20a7d7d..318abb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -36,7 +36,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
if(!ctx.channel().isOpen()){
- logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+ if(in.readableBytes() > 0) logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
in.skipBytes(in.readableBytes());
return;
}