You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/02 18:51:23 UTC

[2/6] git commit: DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.

DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/760cbd42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/760cbd42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/760cbd42

Branch: refs/heads/master
Commit: 760cbd421c131ed43f5011764c7e244b661bd84b
Parents: 623a52e
Author: Aman Sinha <as...@maprtech.com>
Authored: Wed May 28 17:32:05 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 2 09:12:09 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ScreenCreator.java |  4 +-
 .../BroadcastSenderRootExec.java                | 51 +++++++++++++++-----
 .../exec/planner/physical/PlannerSettings.java  | 11 +++--
 .../apache/drill/exec/record/WritableBatch.java |  6 +++
 .../apache/drill/exec/rpc/data/DataTunnel.java  | 12 ++---
 .../server/options/SystemOptionManager.java     |  1 +
 6 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c92633f..9aefbe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -74,6 +74,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     public boolean next() {
       if(!ok){
         stop();
+        context.fail(this.listener.ex);
         return false;
       }
 
@@ -135,7 +136,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     private SendListener listener = new SendListener();
 
     private class SendListener extends BaseRpcOutcomeListener<Ack>{
-
+      volatile RpcException ex; 
 
 
       @Override
@@ -150,6 +151,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         logger.error("Failure while sending data to user.", ex);
         ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
         ok = false;
+        this.ex = ex;
       }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 0a01583..9c55825 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.broadcastsender;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
@@ -26,13 +28,15 @@ import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.work.ErrorHelper;
 
 /**
  * Broadcast Sender broadcasts incoming batches to all receivers (one or more).
@@ -47,7 +51,6 @@ public class BroadcastSenderRootExec implements RootExec {
   private final ExecProtos.FragmentHandle handle;
   private volatile boolean ok;
   private final RecordBatch incoming;
-  private final DrillRpcFuture[] responseFutures;
 
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
@@ -63,12 +66,12 @@ public class BroadcastSenderRootExec implements RootExec {
       FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build();
       tunnels[i] = context.getDataTunnel(destinations.get(i), opp);
     }
-    responseFutures = new DrillRpcFuture[destinations.size()];
   }
 
   @Override
   public boolean next() {
     if(!ok) {
+      context.fail(statusHandler.ex);
       return false;
     }
 
@@ -79,24 +82,25 @@ public class BroadcastSenderRootExec implements RootExec {
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
-          responseFutures[i] = tunnels[i].sendRecordBatch(context, b2);
+          tunnels[i].sendRecordBatch(this.statusHandler, b2);
+          statusHandler.sendCount.increment();
         }
 
-        waitAllFutures(false);
         return false;
 
       case OK_NEW_SCHEMA:
       case OK:
         WritableBatch writableBatch = incoming.getWritableBatch();
+        if (tunnels.length > 1) {
+          writableBatch.retainBuffers(tunnels.length - 1);  
+        }
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
-          if(i > 0) {
-            writableBatch.retainBuffers();
-          }
-          responseFutures[i] = tunnels[i].sendRecordBatch(context, batch);
+          tunnels[i].sendRecordBatch(this.statusHandler, batch);   
+          statusHandler.sendCount.increment();
         }
 
-        return waitAllFutures(true);
+        return ok;
 
       case NOT_YET:
       default:
@@ -104,6 +108,7 @@ public class BroadcastSenderRootExec implements RootExec {
     }
   }
 
+  /*
   private boolean waitAllFutures(boolean haltOnError) {
     for (DrillRpcFuture<?> responseFuture : responseFutures) {
       try {
@@ -124,10 +129,34 @@ public class BroadcastSenderRootExec implements RootExec {
     }
     return true;
   }
-
+*/
+  
   @Override
   public void stop() {
       ok = false;
+      statusHandler.sendCount.waitForSendComplete();
       incoming.cleanup();
   }
+  
+  private StatusHandler statusHandler = new StatusHandler();
+  private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
+    volatile RpcException ex;
+    private final SendingAccountor sendCount = new SendingAccountor();
+    
+    @Override
+    public void success(Ack value, ByteBuf buffer) {
+      sendCount.decrement();
+      super.success(value, buffer);
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      sendCount.decrement();
+      logger.error("Failure while sending data to user.", ex);
+      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+      ok = false;
+      this.ex = ex;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 18a32af..ad9fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -22,13 +22,15 @@ import net.hydromatic.optiq.tools.FrameworkContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 
 public class PlannerSettings implements FrameworkContext{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class);
 
   private int numEndPoints = 0;
   private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
-  private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold
+
+  public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE; 
 
   public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
   public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
@@ -36,7 +38,8 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
   public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
-  public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
+  public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
+  public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000);
 
   public OptionManager options = null;
 
@@ -88,8 +91,8 @@ public class PlannerSettings implements FrameworkContext{
     return options.getOption(BROADCAST.getOptionName()).bool_val;
   }
 
-  public int getBroadcastThreshold() {
-    return broadcastThreshold;
+  public long getBroadcastThreshold() {
+    return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4ff3708..14ade39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,4 +163,10 @@ public class WritableBatch {
       buf.retain();
     }
   }
+  
+  public void retainBuffers(int increment) {
+    for (ByteBuf buf : buffers) {
+      buf.retain(increment);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 1dcd89e..98bbeeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -37,22 +37,22 @@ public class DataTunnel {
   }
 
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
-    SendBatch b = new SendBatch(outcomeListener, batch);
+    SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
     manager.runCommand(b);
   }
 
   public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    SendBatchAsync b = new SendBatchAsync(batch, context);
+    SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
     manager.runCommand(b);
     return b.getFuture();
   }
 
 
   
-  public static class SendBatch extends ListeningCommand<Ack, DataClientConnection> {
+  private static class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
     final FragmentWritableBatch batch;
 
-    public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
+    public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
       super(listener);
       this.batch = batch;
     }
@@ -70,11 +70,11 @@ public class DataTunnel {
     
   }
 
-  public static class SendBatchAsync extends FutureBitCommand<Ack, DataClientConnection> {
+  private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
     final FragmentWritableBatch batch;
     final FragmentContext context;
 
-    public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) {
+    public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
       super();
       this.batch = batch;
       this.context = context;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8d9a68f..3e90eb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -41,6 +41,7 @@ public class SystemOptionManager implements OptionManager{
       PlannerSettings.MERGEJOIN, 
       PlannerSettings.MULTIPHASE,
       PlannerSettings.BROADCAST,
+      PlannerSettings.BROADCAST_THRESHOLD,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR
   };