You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/02 18:51:23 UTC
[2/6] git commit: DRILL-853: Enable broadcast joins and fix some
issues with BroadcastExchange and ScreenCreator.
DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/760cbd42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/760cbd42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/760cbd42
Branch: refs/heads/master
Commit: 760cbd421c131ed43f5011764c7e244b661bd84b
Parents: 623a52e
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed May 28 17:32:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:09 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/ScreenCreator.java | 4 +-
.../BroadcastSenderRootExec.java | 51 +++++++++++++++-----
.../exec/planner/physical/PlannerSettings.java | 11 +++--
.../apache/drill/exec/record/WritableBatch.java | 6 +++
.../apache/drill/exec/rpc/data/DataTunnel.java | 12 ++---
.../server/options/SystemOptionManager.java | 1 +
6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c92633f..9aefbe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -74,6 +74,7 @@ public class ScreenCreator implements RootCreator<Screen>{
public boolean next() {
if(!ok){
stop();
+ context.fail(this.listener.ex);
return false;
}
@@ -135,7 +136,7 @@ public class ScreenCreator implements RootCreator<Screen>{
private SendListener listener = new SendListener();
private class SendListener extends BaseRpcOutcomeListener<Ack>{
-
+ volatile RpcException ex;
@Override
@@ -150,6 +151,7 @@ public class ScreenCreator implements RootCreator<Screen>{
logger.error("Failure while sending data to user.", ex);
ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
ok = false;
+ this.ex = ex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 0a01583..9c55825 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.broadcastsender;
+import io.netty.buffer.ByteBuf;
+
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
@@ -26,13 +28,15 @@ import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.work.ErrorHelper;
/**
* Broadcast Sender broadcasts incoming batches to all receivers (one or more).
@@ -47,7 +51,6 @@ public class BroadcastSenderRootExec implements RootExec {
private final ExecProtos.FragmentHandle handle;
private volatile boolean ok;
private final RecordBatch incoming;
- private final DrillRpcFuture[] responseFutures;
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
@@ -63,12 +66,12 @@ public class BroadcastSenderRootExec implements RootExec {
FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
}
- responseFutures = new DrillRpcFuture[destinations.size()];
}
@Override
public boolean next() {
if(!ok) {
+ context.fail(statusHandler.ex);
return false;
}
@@ -79,24 +82,25 @@ public class BroadcastSenderRootExec implements RootExec {
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
- responseFutures[i] = tunnels[i].sendRecordBatch(context, b2);
+ tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ statusHandler.sendCount.increment();
}
- waitAllFutures(false);
return false;
case OK_NEW_SCHEMA:
case OK:
WritableBatch writableBatch = incoming.getWritableBatch();
+ if (tunnels.length > 1) {
+ writableBatch.retainBuffers(tunnels.length - 1);
+ }
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
- if(i > 0) {
- writableBatch.retainBuffers();
- }
- responseFutures[i] = tunnels[i].sendRecordBatch(context, batch);
+ tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ statusHandler.sendCount.increment();
}
- return waitAllFutures(true);
+ return ok;
case NOT_YET:
default:
@@ -104,6 +108,7 @@ public class BroadcastSenderRootExec implements RootExec {
}
}
+ /*
private boolean waitAllFutures(boolean haltOnError) {
for (DrillRpcFuture<?> responseFuture : responseFutures) {
try {
@@ -124,10 +129,34 @@ public class BroadcastSenderRootExec implements RootExec {
}
return true;
}
-
+*/
+
@Override
public void stop() {
ok = false;
+ statusHandler.sendCount.waitForSendComplete();
incoming.cleanup();
}
+
+ private StatusHandler statusHandler = new StatusHandler();
+ private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
+ volatile RpcException ex;
+ private final SendingAccountor sendCount = new SendingAccountor();
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ sendCount.decrement();
+ super.success(value, buffer);
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ sendCount.decrement();
+ logger.error("Failure while sending data to user.", ex);
+ ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+ ok = false;
+ this.ex = ex;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 18a32af..ad9fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -22,13 +22,15 @@ import net.hydromatic.optiq.tools.FrameworkContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValidator;
import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
public class PlannerSettings implements FrameworkContext{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
private int numEndPoints = 0;
private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
- private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold
+
+ public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE;
public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
@@ -36,7 +38,8 @@ public class PlannerSettings implements FrameworkContext{
public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
- public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
+ public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
+ public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000);
public OptionManager options = null;
@@ -88,8 +91,8 @@ public class PlannerSettings implements FrameworkContext{
return options.getOption(BROADCAST.getOptionName()).bool_val;
}
- public int getBroadcastThreshold() {
- return broadcastThreshold;
+ public long getBroadcastThreshold() {
+ return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4ff3708..14ade39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,4 +163,10 @@ public class WritableBatch {
buf.retain();
}
}
+
+ public void retainBuffers(int increment) {
+ for (ByteBuf buf : buffers) {
+ buf.retain(increment);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 1dcd89e..98bbeeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -37,22 +37,22 @@ public class DataTunnel {
}
public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
- SendBatch b = new SendBatch(outcomeListener, batch);
+ SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
manager.runCommand(b);
}
public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
- SendBatchAsync b = new SendBatchAsync(batch, context);
+ SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
manager.runCommand(b);
return b.getFuture();
}
- public static class SendBatch extends ListeningCommand<Ack, DataClientConnection> {
+ private static class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
final FragmentWritableBatch batch;
- public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
+ public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
super(listener);
this.batch = batch;
}
@@ -70,11 +70,11 @@ public class DataTunnel {
}
- public static class SendBatchAsync extends FutureBitCommand<Ack, DataClientConnection> {
+ private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
final FragmentWritableBatch batch;
final FragmentContext context;
- public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) {
+ public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
super();
this.batch = batch;
this.context = context;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8d9a68f..3e90eb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -41,6 +41,7 @@ public class SystemOptionManager implements OptionManager{
PlannerSettings.MERGEJOIN,
PlannerSettings.MULTIPHASE,
PlannerSettings.BROADCAST,
+ PlannerSettings.BROADCAST_THRESHOLD,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR
};