You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:39 UTC

[drill] branch master updated (55ac523 -> 0a3ebc2)

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 55ac523  DRILL-6410: Fixed memory leak in flat Parquet reader
     new eb80271  DRILL-6788: Intermittent unit test failure TestDrillbitResilience.failsWhenParsing: Query state should be FAILED (and not COMPLETED) closes #1499
     new d5146c4  DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed Note: Issue was in StreamingAgg where if output from one or multiple input batch was splitting into multiple output batch, then remaining input records were discarded after producing first output batch closes #1490
     new 216b123  DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
     new de76e13  DRILL-6731: Resolving race conditions in RuntimeFilterSink Add condition variable to avoid starvation of producer thread while acquiring queue lock
     new 0a3ebc2  DRILL-6731: use thread pool to run the runtime filter aggregating work closes #1459

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/drill/exec/ops/FragmentContext.java |   8 +-
 .../apache/drill/exec/ops/FragmentContextImpl.java |  20 +-
 .../physical/impl/aggregate/StreamingAggBatch.java |  74 +++--
 .../impl/aggregate/StreamingAggTemplate.java       |  56 ++--
 .../impl/aggregate/StreamingAggregator.java        |  19 +-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  43 +--
 .../exec/physical/impl/join/HashJoinBatch.java     |   5 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |   4 +-
 .../exec/physical/impl/unnest/UnnestImpl.java      |  12 +-
 .../physical/visitor/RuntimeFilterVisitor.java     |  32 +--
 .../drill/exec/record/AbstractRecordBatch.java     |  10 +-
 .../org/apache/drill/exec/work/WorkManager.java    |   4 +-
 .../exec/work/filter/RuntimeFilterReporter.java    |   5 +-
 ...FilterManager.java => RuntimeFilterRouter.java} |  60 +---
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 222 +++++++++++++++
 .../exec/work/filter/RuntimeFilterWritable.java    |  46 ++-
 .../apache/drill/exec/work/foreman/Foreman.java    |  16 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 315 ++++++++++++++++++++-
 .../drill/exec/server/TestDrillbitResilience.java  |   4 +-
 .../org/apache/drill/test/OperatorFixture.java     |  12 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |  12 +-
 .../java/org/apache/drill/exec/proto/BitData.java  | 127 ++++++++-
 .../org/apache/drill/exec/proto/SchemaBitData.java |   7 +
 .../drill/exec/proto/beans/RuntimeFilterBDef.java  |  22 ++
 protocol/src/main/protobuf/BitData.proto           |   1 +
 25 files changed, 940 insertions(+), 196 deletions(-)
 rename exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/{RuntimeFilterManager.java => RuntimeFilterRouter.java} (87%)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java


[drill] 05/05: DRILL-6731: use thread pool to run the runtime filter aggregating work closes #1459

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0a3ebc24d941f75841dcb2a6251e22d2a988c46c
Author: weijie.tong <we...@alipay.com>
AuthorDate: Tue Oct 9 20:28:22 2018 +0800

    DRILL-6731: use thread pool to run the runtime filter aggregating work
    closes #1459
---
 .../org/apache/drill/exec/ops/FragmentContextImpl.java   | 10 ++++++----
 .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++--------
 .../test/java/org/apache/drill/test/OperatorFixture.java |  3 ++-
 .../org/apache/drill/test/PhysicalOpUnitTestBase.java    |  2 +-
 4 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 1f9d489..fcfdc8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -209,7 +209,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
-    this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
+    boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+    if (enableRF) {
+      ExecutorService executorService = context.getExecutor();
+      this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+    }
   }
 
   /**
@@ -472,9 +476,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    if (runtimeFilterSink != null) {
-      suppressingClose(runtimeFilterSink);
-    }
+    suppressingClose(runtimeFilterSink);
     suppressingClose(bufferManager);
     suppressingClose(allocator);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 754c68e..1468625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -18,10 +18,11 @@
 package org.apache.drill.exec.work.filter;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,18 +64,17 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   private ReentrantLock aggregatedRFLock = new ReentrantLock();
 
-  private Thread asyncAggregateThread;
-
   private BufferAllocator bufferAllocator;
 
+  private Future future;
+
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
     this.bufferAllocator = bufferAllocator;
     AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-    asyncAggregateThread.start();
+    future = executorService.submit(asyncAggregateWorker);
   }
 
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
@@ -158,7 +158,7 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   @Override
   public void close() throws Exception {
-    asyncAggregateThread.interrupt();
+    future.cancel(true);
     doCleanup();
   }
 
@@ -209,7 +209,7 @@ public class RuntimeFilterSink implements AutoCloseable {
           currentBookId.incrementAndGet();
         }
       } catch (InterruptedException e) {
-        logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+        logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
         Thread.currentThread().interrupt();
       } finally {
         doCleanup();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 81d0d1a..a1e7d0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -81,6 +81,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -197,7 +198,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
-      this.runtimeFilterSink = new RuntimeFilterSink(allocator);
+      this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 559f7f4..300e88b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -209,7 +209,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
-      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
+      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
     }
 
     @Override


[drill] 02/05: DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed Note: Issue was in StreamingAgg where if output from one or multiple input batch was splitting into multiple output batch, then remaining input records were discarded after producing first output batch closes #1490

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d5146c43986f09f132f4e96966082732a3740181
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Mon Oct 1 14:15:33 2018 -0700

    DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed
    Note: Issue was in StreamingAgg where if output from one or multiple input batch was splitting into multiple output batch, then remaining input
    records were discarded after producing first output batch
    closes #1490
---
 .../physical/impl/aggregate/StreamingAggBatch.java |  74 +++--
 .../impl/aggregate/StreamingAggTemplate.java       |  56 ++--
 .../impl/aggregate/StreamingAggregator.java        |  19 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |   4 +-
 .../exec/physical/impl/unnest/UnnestImpl.java      |  12 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  10 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 315 ++++++++++++++++++++-
 7 files changed, 423 insertions(+), 67 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2b9b317..ffcfa78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
@@ -71,6 +72,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
@@ -104,7 +106,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
                                  // call to inner next is made.
   private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
                                     // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
-                                    // we send out an emopty batch with EMIT.
+                                    // we send out an empty batch with EMIT.
   private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next
   private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
@@ -127,7 +129,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private boolean specialBatchSent = false;
   private static final int SPECIAL_BATCH_COUNT = 1;
 
-  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  // TODO: Needs to adapt to batch sizing rather than hardcoded constant value
+  private int maxOutputRowCount = ValueVector.MAX_ROW_COUNT;
+
+  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context)
+    throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
 
@@ -189,7 +195,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     // if a special batch has been sent, we have no data in the incoming so exit early
     if (done || specialBatchSent) {
-      assert (sendEmit != true); // if special batch sent with emit then flag will not be set
+      assert (!sendEmit); // if special batch sent with emit then flag will not be set
       return NONE;
     }
 
@@ -199,6 +205,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
+      firstBatchForSchema = false;
       recordCount = 0;
       specialBatchSent = false;
       return EMIT;
@@ -239,18 +246,17 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
             done = true;
             return IterOutcome.STOP;
           }
+          firstBatchForSchema = true;
           break;
         case EMIT:
           // if we get an EMIT with an empty batch as the first (and therefore only) batch
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
             // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
-            IterOutcome finalOutcome =  getFinalOutcome();
-            return finalOutcome;
+            return  getFinalOutcome();
           }
           // else fall thru
         case OK:
@@ -259,15 +265,18 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome));
       }
     } else {
-      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) {
+      // If this is not the first batch and previous batch is fully processed with no error condition or NONE is not
+      // seen then it will call next() on upstream to get new batch. Otherwise just process the previous incoming batch
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()
+        && aggregator.previousBatchProcessed()) {
         lastKnownOutcome = incoming.next();
         if (!first ) {
           //Setup needs to be called again. During setup, generated code saves a reference to the vectors
-          // pointed to by the incoming batch so that the dereferencing of the vector wrappers to get to
+          // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to
           // the vectors  does not have to be done at each call to eval. However, after an EMIT is seen,
           // the vectors are replaced and the reference to the old vectors is no longer valid
           try {
-            aggregator.setup(oContext, incoming, this);
+            aggregator.setup(oContext, incoming, this, maxOutputRowCount);
           } catch (SchemaChangeException e) {
             UserException.Builder exceptionBuilder = UserException.functionError(e)
                 .message("A Schema change exception occured in calling setup() in generated code.");
@@ -280,8 +289,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     recordCount = aggregator.getOutputCount();
     container.setRecordCount(recordCount);
     logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount());
-    // overwrite the outcome variable since we no longer need to remember the first batch outcome
-    lastKnownOutcome = aggregator.getOutcome();
+    // get the returned IterOutcome from aggregator and based on AggOutcome and returned IterOutcome update the
+    // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET then lastKnownOutcome is always set to
+    // EMIT
+    IterOutcome returnOutcome = aggregator.getOutcome();
     switch (aggOutcome) {
       case CLEANUP_AND_RETURN:
         if (!first) {
@@ -289,7 +300,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         }
         done = true;
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        return returnOutcome;
       case RETURN_AND_RESET:
         //WE could have got a string of batches, all empty, until we hit an emit
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
@@ -298,28 +309,32 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           // If outcome is NONE then we send the special batch in the first iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send the special
           // batch and the EMIT outcome at the same time.
-
-          IterOutcome finalOutcome =  getFinalOutcome();
-          return finalOutcome;
+          return getFinalOutcome();
         }
         firstBatchForDataSet = true;
         firstBatchForSchema = false;
         if(first) {
           first = false;
         }
-        if(lastKnownOutcome == OK_NEW_SCHEMA) {
-          sendEmit = true;
+        // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is OK_NEW_SCHEMA from Aggregator that means it
+        // has seen first batch with OK_NEW_SCHEMA and then last batch with EMIT outcome. In that case if all the input
+        // batch is processed to produce output batch it need to send and empty batch with EMIT outcome in subsequent
+        // next call.
+        if(returnOutcome == OK_NEW_SCHEMA) {
+          sendEmit = (aggregator == null) || aggregator.previousBatchProcessed();
         }
         // Release external sort batches after EMIT is seen
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        lastKnownOutcome = EMIT;
+        return returnOutcome;
       case RETURN_OUTCOME:
         // In case of complex writer expression, vectors would be added to batch run-time.
         // We have to re-build the schema.
         if (complexWriters != null) {
           container.buildSchema(SelectionVectorMode.NONE);
         }
-        if (lastKnownOutcome == IterOutcome.NONE ) {
+        if (returnOutcome == IterOutcome.NONE ) {
+          lastKnownOutcome = NONE;
           // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome
           // to determine whether we should set the flag or not.
           // This is so that if someone calls getRecordCount in between calls to innerNext, we will
@@ -330,11 +345,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           } else {
             return OK;
           }
-        } else if (lastKnownOutcome == OK && first) {
+        } else if (returnOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
+          returnOutcome = OK_NEW_SCHEMA;
         }
         first = false;
-        return lastKnownOutcome;
+        return returnOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
         // If the former, we can handle the change and so need to update the aggregator and
@@ -342,8 +358,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         // and exception
         // This case is not tested since there are no unit tests for this and there is no support
         // from the sort operator for this case
-        if (lastKnownOutcome == EMIT) {
+        if (returnOutcome == EMIT) {
           createAggregator();
+          lastKnownOutcome = EMIT;
           return OK_NEW_SCHEMA;
         } else {
           context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
@@ -351,6 +368,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
                   incoming.getSchema()).getMessage()).build(logger));
           close();
           killIncoming(false);
+          lastKnownOutcome = STOP;
           return IterOutcome.STOP;
         }
       default:
@@ -433,7 +451,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //  cg.getCodeGenerator().saveCodeForDebugging(true);
+    //cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
@@ -506,7 +524,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this);
+    agg.setup(oContext, incoming, this, maxOutputRowCount);
     allocateComplexWriters();
     return agg;
   }
@@ -651,7 +669,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public void dump() {
-    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
-        container, popConfig, aggregator, incomingSchema);
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", container, popConfig, aggregator, incomingSchema);
+  }
+
+  @VisibleForTesting
+  public void setMaxOutputRowCount(int maxOutputRowCount) {
+    this.maxOutputRowCount = maxOutputRowCount;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4bde7ab..cc89f23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -33,7 +34,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+  private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
 
   // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
@@ -54,7 +55,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
   private int previousIndex = -1;  // the last index that has been processed. Initialized to -1 every time a new
                                    // aggregate group begins (including every time a new data set begins)
-  private int currentIndex; // current index being processed
+  private int currentIndex = Integer.MAX_VALUE; // current index being processed
   /**
    * Number of records added to the current aggregation group.
    */
@@ -72,10 +73,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
 
   @Override
-  public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
+  public void setup(OperatorContext context, RecordBatch incoming,
+                    StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.maxOutputRows = outputRowCount;
     setupInterior(incoming, outgoing);
   }
 
@@ -109,7 +112,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       allocateOutgoing();
 
       if (firstBatchForDataSet) {
-        this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
+        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex);
 
         if (outerOutcome == OK_NEW_SCHEMA) {
           firstBatchForSchema = true;
@@ -178,9 +181,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         // loop through existing records, adding as necessary.
         if(!processRemainingRecordsInBatch()) {
           // output batch is full. Return.
-          return setOkAndReturn();
+          return setOkAndReturn(outerOutcome);
         }
-        // if the current batch came with an EMIT, we're done
+        // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
+        // the rows in incoming batch
         if(outerOutcome == EMIT) {
           // output the last record
           outputToBatch(previousIndex);
@@ -215,14 +219,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 done = true;
                 lastOutcome = out;
                 if (firstBatchForDataSet && addedRecordCount == 0) {
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else if (addedRecordCount > 0) {
                   outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                   // (output container full or not) as we are not going to insert any more records.
                   if (EXTRA_DEBUG) {
                     logger.debug("Received no more batches, returning.");
                   }
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else {
                   // not first batch and record Count == 0
                   outcome = out;
@@ -237,6 +241,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   }
                 } else {
                   resetIndex();
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of previous batch, adding.");
@@ -256,13 +261,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                         if (EXTRA_DEBUG) {
                           logger.debug("Output container is full. flushing it.");
                         }
-                        return setOkAndReturnEmit();
+                        return setOkAndReturn(EMIT);
                       }
                     }
                     // important to set the previous index to -1 since we start a new group
                     previousIndex = -1;
                   }
-                  processRemainingRecordsInBatch();
+                  if (!processRemainingRecordsInBatch()) {
+                    // output batch is full. Return.
+                    return setOkAndReturn(EMIT);
+                  }
                   outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
                 }
                 resetIndex();
@@ -285,7 +293,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                     logger.debug("Wrote out end of previous batch, returning.");
                   }
                   newSchema = true;
-                  return setOkAndReturn();
+                  return setOkAndReturn(OK_NEW_SCHEMA);
                 }
                 cleanup();
                 return AggOutcome.UPDATE_AGGREGATOR;
@@ -294,6 +302,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 if (incoming.getRecordCount() == 0) {
                   continue;
                 } else {
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of previous batch, adding.");
@@ -315,7 +324,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                           logger.debug("Output container is full. flushing it.");
                         }
                         previousIndex = -1;
-                        return setOkAndReturn();
+                        return setOkAndReturn(OK);
                       }
                     }
                     previousIndex = -1;
@@ -405,8 +414,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   }
 
   private final void resetIndex() {
-    underlyingIndex = -1;
-    incIndex();
+    underlyingIndex = 0;
+    currentIndex = Integer.MAX_VALUE;
   }
 
   /**
@@ -414,7 +423,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
    *
    * @return outcome
    */
-  private final AggOutcome setOkAndReturn() {
+  private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) {
     IterOutcome outcomeToReturn;
     firstBatchForDataSet = false;
     if (firstBatchForSchema) {
@@ -428,7 +437,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
-    return AggOutcome.RETURN_OUTCOME;
+    return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME;
   }
 
   /**
@@ -457,7 +466,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   // Returns output container status after insertion of the given record. Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
 
     outputRecordKeys(inIndex, outputCount);
@@ -470,14 +479,13 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   // Returns output container status after insertion of the given record. Caller must check the return value if it
   // plans to inserts more record into outgoing container.
   private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
 
     outputRecordKeysPrev(b1, inIndex, outIndex);
@@ -485,8 +493,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   private void addRecordInc(int index) {
@@ -508,6 +515,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         + "]";
   }
 
+  @Override
+  public boolean previousBatchProcessed() {
+    return (currentIndex == Integer.MAX_VALUE);
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 23fdcc1..57caa9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -25,7 +25,8 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public interface StreamingAggregator {
 
-  public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
+  TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION =
+    new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
 
 
   /**
@@ -45,25 +46,27 @@ public interface StreamingAggregator {
    * <p>
    * @see org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome HashAggregator.AggOutcome
    */
-  public static enum AggOutcome {
+  enum AggOutcome {
     RETURN_OUTCOME,
     CLEANUP_AND_RETURN,
     UPDATE_AGGREGATOR,
     RETURN_AND_RESET;
   }
 
-  public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
+  void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing, int outputRowCount)
+    throws SchemaChangeException;
 
-  public abstract IterOutcome getOutcome();
+  IterOutcome getOutcome();
 
-  public abstract int getOutputCount();
+  int getOutputCount();
 
   // do the work. Also pass in the Iteroutcome of the batch already read in case it might be an EMIT. If the
   // outerOutcome is EMIT, we need to do the work without reading any more batches.
-  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+  AggOutcome doWork(IterOutcome outerOutcome);
 
-  public abstract boolean isDone();
+  boolean isDone();
 
-  public abstract void cleanup();
+  void cleanup();
 
+  boolean previousBatchProcessed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 242687f..735f11f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -389,7 +389,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
-      logger.debug("Setting up new schema based on incoming batch. Old output schema: %s", container.getSchema());
+      logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema());
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -805,7 +805,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    */
   private void setupNewSchema() throws SchemaChangeException {
 
-    logger.debug("Setting up new schema based on incoming batch. New left schema: %s and New right schema: %s",
+    logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}",
       left.getSchema(), right.getSchema());
 
     // Clear up the container
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 508999f..a9c9598 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -118,6 +118,7 @@ public class UnnestImpl implements Unnest {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support selection vector inputs.");
 
     final int initialInnerValueIndex = runningInnerValueIndex;
+    int nonEmptyArray = 0;
 
     outer:
     {
@@ -126,8 +127,12 @@ public class UnnestImpl implements Unnest {
 
       for (; valueIndex < valueCount; valueIndex++) {
         final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
-        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}",
-            innerValueCount, recordCount, outputLimit);
+        logger.trace("Unnest: CurrentRowId: {}, innerValueCount: {}, outputIndex: {},  output limit: {}",
+            valueIndex, innerValueCount, outputIndex, outputLimit);
+
+        if (innerValueCount > 0) {
+          ++nonEmptyArray;
+        }
 
         for (; innerValueIndex < innerValueCount; innerValueIndex++) {
           // If we've hit the batch size limit, stop and flush what we've got so far.
@@ -148,6 +153,9 @@ public class UnnestImpl implements Unnest {
       }  // forevery value in the array
     }  // for every incoming record
     final int delta = runningInnerValueIndex - initialInnerValueIndex;
+    logger.debug("Unnest: Finished processing current batch. [Details: LastProcessedRowIndex: {}, " +
+      "RowsWithNonEmptyArrays: {}, outputIndex: {}, outputLimit: {}, TotalIncomingRecords: {}]",
+      valueIndex, nonEmptyArray, delta, outputLimit, accessor.getValueCount());
     final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(initialInnerValueIndex, delta);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 362ea29..eb6112d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -135,13 +135,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
       return next;
     }
 
-    switch(next) {
+    boolean isNewSchema = false;
+    logger.debug("Received next batch for index: {} with outcome: {}", inputIndex, next);
+    switch (next) {
       case OK_NEW_SCHEMA:
-        stats.batchReceived(inputIndex, b.getRecordCount(), true);
-        break;
+        isNewSchema = true;
       case OK:
       case EMIT:
-        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        stats.batchReceived(inputIndex, b.getRecordCount(), isNewSchema);
+        logger.debug("Number of records in received batch: {}", b.getRecordCount());
         break;
       default:
         break;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index cead984..37a44ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -527,6 +527,232 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     nonEmptyInputRowSet2.clear();
   }
 
+  /**
+   * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches is handled correctly such that first output is produced with OK_NEW_SCHEMA and then
+   * followed by EMIT outcome
+   */
+  @Test
+  public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Expect OK_NEW_SCHEMA first for all the input batch from second batch onwards since output batch is full after
+    // producing 2 groups as output
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    // The last group was produced in different output batch with EMIT outcome
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  /**
+   * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches and incoming batches received with OK,OK,EMIT whose output is also split across
+   * multiple output batches is handled correctly.
+   */
+  @Test
+  public void t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet6 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 50, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet7 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 130, "item3")
+      .addRow(3, 130, "item3")
+      .addRow(4, 140, "item4")
+      .addRow(4, 140, "item4")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet3 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item2", 2, (long)2)
+      .addRow("item3", 3, (long)2)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet4 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item4", 4, (long)2)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(nonEmptyInputRowSet6.container());
+    inputContainer.add(nonEmptyInputRowSet7.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Output batches for input batch 2 to 5
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    // Output batches for input batch 6 to 8
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    // output batch is full after producing 2 rows
+    assertEquals(2, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet3).verify(actualRowSet);
+
+    // output batch with pending rows
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet4).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+    nonEmptyInputRowSet5.clear();
+    nonEmptyInputRowSet6.clear();
+    nonEmptyInputRowSet7.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+    expectedRowSet3.clear();
+    expectedRowSet4.clear();
+  }
 
   /*****************************************************************************************
    Tests for validating regular StreamingAggr behavior with no EMIT outcome
@@ -620,6 +846,88 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  @Test
+  public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
   /*******************************************************
    * Tests for EMIT with empty batches and no group by
    * (Tests t1-t8 are repeated with no group by)
@@ -813,14 +1121,15 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, strAggBatch.getRecordCount());
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // data batch
 
     RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
     new RowSetComparison(expectedRowSet).verify(actualRowSet);


[drill] 04/05: DRILL-6731: Resolving race conditions in RuntimeFilterSink Add condition variable to avoid starvation of producer thread while acquiring queue lock

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit de76e135316086386e2f7edd04ec1d5ca479bc59
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Sep 26 13:20:02 2018 -0700

    DRILL-6731: Resolving race conditions in RuntimeFilterSink
    Add condition variable to avoid starvation of producer thread while acquiring queue lock
---
 .../physical/visitor/RuntimeFilterVisitor.java     |  12 +-
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 127 +++++++++++++++------
 .../exec/work/filter/RuntimeFilterWritable.java    |  36 +++---
 3 files changed, 114 insertions(+), 61 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index bfba5f2..fcfa2bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -222,18 +222,10 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
           holder.setFromBuildSide(true);
           right.accept(this, holder);
           boolean routeToForeman = holder.needToRouteToForeman();
-          if (!routeToForeman) {
-            runtimeFilterDef.setSendToForeman(false);
-          } else {
-            runtimeFilterDef.setSendToForeman(true);
-          }
+          runtimeFilterDef.setSendToForeman(routeToForeman);
           List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
           for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-            if (!routeToForeman) {
-              bloomFilterDef.setLocal(true);
-            } else {
-              bloomFilterDef.setLocal(false);
-            }
+            bloomFilterDef.setLocal(!routeToForeman);
           }
         }
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 8f4c823..754c68e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -38,12 +39,28 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   private int staleBookId = 0;
 
+  /**
+   * RuntimeFilterWritable holding the aggregated version of all the received filter
+   */
   private RuntimeFilterWritable aggregated = null;
 
   private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
 
+  /**
+   * Flag used by Minor Fragment thread to indicate it has encountered error
+   */
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  /**
+   * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
+   * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
+   * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
+   * indicate producer not to put any new elements in it.
+   */
+  private ReentrantLock queueLock = new ReentrantLock();
+
+  private Condition notEmpty = queueLock.newCondition();
+
   private ReentrantLock aggregatedRFLock = new ReentrantLock();
 
   private Thread asyncAggregateThread;
@@ -62,24 +79,34 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
     if (running.get()) {
-      if (containOne()) {
-        boolean same = aggregated.same(runtimeFilterWritable);
-        if (!same) {
-          //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
-          //share the same FragmentContext.
-          try {
-            aggregatedRFLock.lock();
+      try {
+        aggregatedRFLock.lock();
+        if (containOne()) {
+          boolean same = aggregated.equals(runtimeFilterWritable);
+          if (!same) {
+            // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+            // share the same FragmentContext.
             aggregated.close();
-            aggregated = null;
-          } finally {
-            aggregatedRFLock.unlock();
+            currentBookId.set(0);
+            staleBookId = 0;
+            clearQueued(false);
           }
-          currentBookId.set(0);
-          staleBookId = 0;
-          clearQueued();
         }
+      } finally {
+        aggregatedRFLock.unlock();
+      }
+
+      try {
+        queueLock.lock();
+        if (rfQueue != null) {
+          rfQueue.add(runtimeFilterWritable);
+          notEmpty.signal();
+        } else {
+          runtimeFilterWritable.close();
+        }
+      } finally {
+        queueLock.unlock();
       }
-      rfQueue.add(runtimeFilterWritable);
     } else {
       runtimeFilterWritable.close();
     }
@@ -116,53 +143,77 @@ public class RuntimeFilterSink implements AutoCloseable {
     return aggregated != null;
   }
 
-  @Override
-  public void close() throws Exception {
+  private void doCleanup() {
     running.compareAndSet(true, false);
-    asyncAggregateThread.interrupt();
-    if (containOne()) {
-      try {
-        aggregatedRFLock.lock();
+    try {
+      aggregatedRFLock.lock();
+      if (containOne()) {
         aggregated.close();
-      } finally {
-        aggregatedRFLock.unlock();
+        aggregated = null;
       }
+    } finally {
+      aggregatedRFLock.unlock();
     }
-    clearQueued();
   }
 
-  private void clearQueued() {
+  @Override
+  public void close() throws Exception {
+    asyncAggregateThread.interrupt();
+    doCleanup();
+  }
+
+  private void clearQueued(boolean setToNull) {
     RuntimeFilterWritable toClear;
-    while ((toClear = rfQueue.poll()) != null) {
-      toClear.close();
+    try {
+      queueLock.lock();
+      while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
+        toClear.close();
+      }
+      rfQueue = (setToNull) ? null : rfQueue;
+    } finally {
+      queueLock.unlock();
     }
   }
 
-  class AsyncAggregateWorker implements Runnable {
+  private class AsyncAggregateWorker implements Runnable {
 
     @Override
     public void run() {
       try {
+        RuntimeFilterWritable toAggregate = null;
         while (running.get()) {
-          RuntimeFilterWritable toAggregate = rfQueue.take();
-          if (!running.get()) {
-            toAggregate.close();
-            return;
+          try {
+            queueLock.lock();
+            toAggregate = (rfQueue != null) ? rfQueue.poll() :  null;
+            if (toAggregate == null) {
+              notEmpty.await();
+              continue;
+            }
+          } finally {
+            queueLock.unlock();
           }
-          if (containOne()) {
-            try {
-              aggregatedRFLock.lock();
+
+          try {
+            aggregatedRFLock.lock();
+            if (containOne()) {
               aggregated.aggregate(toAggregate);
-            } finally {
-              aggregatedRFLock.unlock();
+
+              // Release the byteBuf referenced by toAggregate since aggregate will not do it
+              toAggregate.close();
+            } else {
+              aggregated = toAggregate;
             }
-          } else {
-            aggregated = toAggregate;
+          } finally {
+            aggregatedRFLock.unlock();
           }
           currentBookId.incrementAndGet();
         }
       } catch (InterruptedException e) {
         logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+        Thread.currentThread().interrupt();
+      } finally {
+        doCleanup();
+        clearQueued(true);
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 302a480..9a971e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -36,9 +36,14 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
 
   private DrillBuf[] data;
 
+  private String identifier;
+
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
+    this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
 
@@ -90,7 +95,7 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       int capacity = src.readableBytes();
       DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
       int readerIndex = src.readerIndex();
-      src.readBytes(duplicateOne, 0, capacity);
+      duplicateOne.writeBytes(src);
       src.readerIndex(readerIndex);
       cloned[i] = duplicateOne;
       i++;
@@ -98,19 +103,25 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
     return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
-  public boolean same(RuntimeFilterWritable other) {
-    BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
-    int otherMajorId = runtimeFilterDef.getMajorFragmentId();
-    int otherMinorId = runtimeFilterDef.getMinorFragmentId();
-    int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
-    int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
-    int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
-    int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
-    return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+  public String toString() {
+    return identifier;
   }
 
-  public String toString() {
-    return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof RuntimeFilterWritable) {
+      RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other;
+      return this.identifier.equals(otherRFW.identifier);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return identifier.hashCode();
   }
 
   @Override
@@ -119,5 +130,4 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       buf.release();
     }
   }
-
 }


[drill] 03/05: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 216b1237739935b04c4f54b3f6f05371a4644085
Author: weijie.tong <we...@alipay.com>
AuthorDate: Thu Sep 6 19:23:35 2018 +0800

    DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
---
 .../org/apache/drill/exec/ops/FragmentContext.java |   8 +-
 .../apache/drill/exec/ops/FragmentContextImpl.java |  16 +-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  43 ++++--
 .../exec/physical/impl/join/HashJoinBatch.java     |   5 +-
 .../physical/visitor/RuntimeFilterVisitor.java     |  24 ++-
 .../org/apache/drill/exec/work/WorkManager.java    |   4 +-
 .../exec/work/filter/RuntimeFilterReporter.java    |   5 +-
 ...FilterManager.java => RuntimeFilterRouter.java} |  60 ++------
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 171 +++++++++++++++++++++
 .../exec/work/filter/RuntimeFilterWritable.java    |  34 +++-
 .../apache/drill/exec/work/foreman/Foreman.java    |  16 +-
 .../org/apache/drill/test/OperatorFixture.java     |  11 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |  12 +-
 .../java/org/apache/drill/exec/proto/BitData.java  | 127 ++++++++++++++-
 .../org/apache/drill/exec/proto/SchemaBitData.java |   7 +
 .../drill/exec/proto/beans/RuntimeFilterBDef.java  |  22 +++
 protocol/src/main/protobuf/BitData.proto           |   1 +
 17 files changed, 448 insertions(+), 118 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c..88c21d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
   void close();
 
   /**
-   * Return null ,if setRuntimeFilter not being called
    * @return
    */
-  RuntimeFilterWritable getRuntimeFilter();
+  RuntimeFilterSink getRuntimeFilterSink();
 
   /**
-   * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
+   * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
    * @param runtimeFilter
    */
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
   interface ExecutorState {
     /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a898078..1f9d489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
 
-  private RuntimeFilterWritable runtimeFilterWritable;
+  private RuntimeFilterSink runtimeFilterSink;
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
+    this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
   }
 
   /**
@@ -348,13 +350,13 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   }
 
   @Override
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-    this.runtimeFilterWritable = runtimeFilter;
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+    this.runtimeFilterSink.aggregate(runtimeFilter);
   }
 
   @Override
-  public RuntimeFilterWritable getRuntimeFilter() {
-    return runtimeFilterWritable;
+  public RuntimeFilterSink getRuntimeFilterSink() {
+    return runtimeFilterSink;
   }
 
   /**
@@ -470,8 +472,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    if (runtimeFilterWritable != null) {
-      suppressingClose(runtimeFilterWritable);
+    if (runtimeFilterSink != null) {
+      suppressingClose(runtimeFilterSink);
     }
     suppressingClose(bufferManager);
     suppressingClose(allocator);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580..9248bbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -36,7 +36,9 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -56,6 +58,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   private Map<String, Integer> field2id = new HashMap<>();
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
+  private RuntimeFilterWritable current;
+  private RuntimeFilterWritable previous;
   private int originalRecordCount;
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
@@ -102,6 +106,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.clear();
     }
     super.close();
+    if (current != null) {
+      current.close();
+    }
   }
 
   @Override
@@ -148,30 +155,36 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
    * schema change hash64 should be reset and this method needs to be called again.
    */
   private void setupHashHelper() {
-    final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
-
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     // Check if RuntimeFilterWritable was received by the minor fragment or not
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       return;
     }
-
-    // Check if bloomFilters is initialized or not
-    if (bloomFilters == null) {
-      bloomFilters = runtimeFilterWritable.unwrap();
+    if (runtimeFilterSink.hasFreshOne()) {
+      RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
+      if (current == null) {
+        current = freshRuntimeFilterWritable;
+        previous = freshRuntimeFilterWritable;
+      } else {
+        previous = current;
+        current = freshRuntimeFilterWritable;
+        previous.close();
+      }
+      bloomFilters = current.unwrap();
     }
-
     // Check if HashHelper is initialized or not
     if (hash64 == null) {
       ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context);
       try {
         //generate hash helper
-        this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+        this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList();
         List<LogicalExpression> hashFieldExps = new ArrayList<>();
         List<TypedFieldId> typedFieldIds = new ArrayList<>();
         for (String toFilterField : toFilterFields) {
           SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
           TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
-          this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+          int[] fieldIds = typedFieldId.getFieldIds();
+          this.field2id.put(toFilterField, fieldIds[0]);
           typedFieldIds.add(typedFieldId);
           ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
@@ -195,11 +208,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.setRecordCount(0);
       return;
     }
-
-    final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     sv2.allocateNew(originalRecordCount);
-
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       // means none of the rows are filtered out hence set all the indexes
       for (int i = 0; i < originalRecordCount; ++i) {
         sv2.setIndex(i, i);
@@ -207,10 +218,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.setRecordCount(originalRecordCount);
       return;
     }
-
-    // Setup a hash helper if need be
+    // Setup a hash helper if needed
     setupHashHelper();
-
     //To make each independent bloom filter work together to construct a final filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
     for (int i = 0; i < toFilterFields.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 658f03a..3d45696 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -724,7 +724,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
-    //RuntimeFilterManager's judgement will have the RuntimeFilterDef.
+    //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
     if (runtimeFilterDef != null) {
       List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -944,7 +944,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
     if (cycleNum == 0 && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman());
+        int hashJoinOpId = this.popConfig.getOperatorId();
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
       }
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index c31e491..bfba5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -197,11 +197,8 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
     @Override
     public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException {
       if (holder != null) {
-        boolean broadcastExchange = exchange instanceof BroadcastExchangePrel;
         if (holder.isFromBuildSide()) {
-          //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark
-          //this HashJoin as BroadcastHashJoin
-          holder.setEncounteredBroadcastExchange(broadcastExchange);
+          holder.setBuildSideExchange(exchange);
         }
       }
       return visitPrel(exchange, holder);
@@ -224,15 +221,15 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
           Prel right = (Prel) hashJoinPrel.getRight();
           holder.setFromBuildSide(true);
           right.accept(this, holder);
-          boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange();
-          if (buildSideEncountererdBroadcastExchange) {
+          boolean routeToForeman = holder.needToRouteToForeman();
+          if (!routeToForeman) {
             runtimeFilterDef.setSendToForeman(false);
           } else {
             runtimeFilterDef.setSendToForeman(true);
           }
           List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
           for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-            if (buildSideEncountererdBroadcastExchange) {
+            if (!routeToForeman) {
               bloomFilterDef.setLocal(true);
             } else {
               bloomFilterDef.setLocal(false);
@@ -338,18 +335,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
    * RuntimeFilter helper util holder
    */
   private static class RFHelperHolder {
-    //whether this join operator is a partitioned HashJoin or broadcast HashJoin,
-    //also single node HashJoin is not expected to do JPPD.
-    private boolean encounteredBroadcastExchange;
 
     private boolean fromBuildSide;
 
-    public boolean isEncounteredBroadcastExchange() {
-      return encounteredBroadcastExchange;
+    private ExchangePrel exchangePrel;
+
+    public void setBuildSideExchange(ExchangePrel exchange){
+      this.exchangePrel = exchange;
     }
 
-    public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) {
-      this.encounteredBroadcastExchange = encounteredBroadcastExchange;
+    public boolean needToRouteToForeman() {
+      return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel);
     }
 
     public boolean isFromBuildSide() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bf91ed3..0d97e0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -395,7 +395,7 @@ public class WorkManager implements AutoCloseable {
               final String originalName = currentThread.getName();
               currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
               try {
-                foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter);
+                foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
               } catch (Exception e) {
                 logger.warn("Exception while registering the RuntimeFilter", e);
               } finally {
@@ -413,7 +413,7 @@ public class WorkManager implements AutoCloseable {
           .setQueryId(queryId).build();
         FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle);
         if (fragmentExecutor != null) {
-          fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter);
+          fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index e6ede7a..6e4a9a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,7 @@ public class RuntimeFilterReporter {
     this.context = context;
   }
 
-  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) {
+  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
     ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
     DrillBuf[] data = new DrillBuf[bloomFilters.size()];
     List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -63,6 +63,7 @@ public class RuntimeFilterReporter {
       .setMajorFragmentId(majorFragmentId)
       .setMinorFragmentId(minorFragmentId)
       .setToForeman(sendToForeman)
+      .setHjOpId(hashJoinOpId)
       .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
       .build();
     RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
@@ -72,7 +73,7 @@ public class RuntimeFilterReporter {
       AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint);
       dataTunnel.sendRuntimeFilter(runtimeFilterWritable);
     } else {
-      context.setRuntimeFilter(runtimeFilterWritable);
+      context.addRuntimeFilter(runtimeFilterWritable);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
similarity index 87%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index e3f89a6..5a8c6fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.work.filter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.Consumer;
@@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.BitData;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
@@ -60,17 +59,14 @@ import java.util.concurrent.ConcurrentHashMap;
  * The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
  * To Partitioned case:
  * The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream
+ * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the
+ * scanned rows to generate the SV2.
  * To Broadcast case:
  * The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the
  * RuntimeFilterRecordBath is the same as the Partitioned one.
- *
- *
- *
  */
-public class RuntimeFilterManager {
+public class RuntimeFilterRouter {
 
   private Wrapper rootWrapper;
   //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
@@ -79,14 +75,12 @@ public class RuntimeFilterManager {
   private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
   //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
   private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
-  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>();
 
   private DrillbitContext drillbitContext;
 
   private SendingAccountor sendingAccountor = new SendingAccountor();
 
-  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
 
   /**
    * This class maintains context for the runtime join push down's filter management. It
@@ -95,7 +89,7 @@ public class RuntimeFilterManager {
    * @param workUnit
    * @param drillbitContext
    */
-  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
+  public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
     this.rootWrapper = workUnit.getRootWrapper();
     this.drillbitContext = drillbitContext;
   }
@@ -134,32 +128,16 @@ public class RuntimeFilterManager {
    * @param runtimeFilterWritable
    */
   public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
-    BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef();
-    int majorId = runtimeFilterB.getMajorFragmentId();
-    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
-    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
-    logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-    int size;
-    synchronized (this) {
-      size = joinMjId2scanSize.get(majorId);
-      Preconditions.checkState(size > 0);
-      RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId);
-      if (aggregatedRuntimeFilter == null) {
-        aggregatedRuntimeFilter = runtimeFilterWritable;
-      } else {
-        aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-      }
-      joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
-      size--;
-      joinMjId2scanSize.put(majorId, size);
-    }
-    if (size == 0) {
-      broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
-    }
+    broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
   }
 
 
-  private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) {
+  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+    DrillBuf[] data = srcRuntimeFilterWritable.getData();
     List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
     int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
     for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
@@ -172,10 +150,8 @@ public class RuntimeFilterManager {
         .setMajorFragmentId(scanNodeMjId)
         .setMinorFragmentId(minorId)
         .build();
-      RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId);
-      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData());
+      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
       CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-
       DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
       Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
         @Override
@@ -235,8 +211,6 @@ public class RuntimeFilterManager {
 
   private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
 
-    private PhysicalOperator targetOp;
-
     private Fragment fragment;
 
     private boolean contain = false;
@@ -251,7 +225,6 @@ public class RuntimeFilterManager {
 
 
     public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
-      this.targetOp = targetOp;
       this.fragment = fragment;
       this.targetIsGroupScan = targetOp instanceof GroupScan;
       this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
@@ -343,13 +316,10 @@ public class RuntimeFilterManager {
 
     private int probeSideScanMajorId;
 
-
-
     private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
 
     private RuntimeFilterDef runtimeFilterDef;
 
-
     public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
       return probeSideScanEndpoints;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
new file mode 100644
index 0000000..8f4c823
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
+
+  private AtomicInteger currentBookId = new AtomicInteger(0);
+
+  private int staleBookId = 0;
+
+  private RuntimeFilterWritable aggregated = null;
+
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+
+  private AtomicBoolean running = new AtomicBoolean(true);
+
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+    this.bufferAllocator = bufferAllocator;
+    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+    asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+    asyncAggregateThread.start();
+  }
+
+  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+    if (running.get()) {
+      if (containOne()) {
+        boolean same = aggregated.same(runtimeFilterWritable);
+        if (!same) {
+          //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+          //share the same FragmentContext.
+          try {
+            aggregatedRFLock.lock();
+            aggregated.close();
+            aggregated = null;
+          } finally {
+            aggregatedRFLock.unlock();
+          }
+          currentBookId.set(0);
+          staleBookId = 0;
+          clearQueued();
+        }
+      }
+      rfQueue.add(runtimeFilterWritable);
+    } else {
+      runtimeFilterWritable.close();
+    }
+  }
+
+  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+    try {
+      aggregatedRFLock.lock();
+      return aggregated.duplicate(bufferAllocator);
+    } finally {
+      aggregatedRFLock.unlock();
+    }
+  }
+
+  /**
+   * whether there's a fresh aggregated RuntimeFilter
+   *
+   * @return
+   */
+  public boolean hasFreshOne() {
+    if (currentBookId.get() > staleBookId) {
+      staleBookId = currentBookId.get();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * whether there's a usable RuntimeFilter.
+   *
+   * @return
+   */
+  public boolean containOne() {
+    return aggregated != null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    running.compareAndSet(true, false);
+    asyncAggregateThread.interrupt();
+    if (containOne()) {
+      try {
+        aggregatedRFLock.lock();
+        aggregated.close();
+      } finally {
+        aggregatedRFLock.unlock();
+      }
+    }
+    clearQueued();
+  }
+
+  private void clearQueued() {
+    RuntimeFilterWritable toClear;
+    while ((toClear = rfQueue.poll()) != null) {
+      toClear.close();
+    }
+  }
+
+  class AsyncAggregateWorker implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        while (running.get()) {
+          RuntimeFilterWritable toAggregate = rfQueue.take();
+          if (!running.get()) {
+            toAggregate.close();
+            return;
+          }
+          if (containOne()) {
+            try {
+              aggregatedRFLock.lock();
+              aggregated.aggregate(toAggregate);
+            } finally {
+              aggregatedRFLock.unlock();
+            }
+          } else {
+            aggregated = toAggregate;
+          }
+          currentBookId.incrementAndGet();
+        }
+      } catch (InterruptedException e) {
+        logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+      }
+    }
+  }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 8649e15..302a480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.filter;
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
 
 import java.util.ArrayList;
@@ -29,7 +30,7 @@ import java.util.List;
  * A binary wire transferable representation of the RuntimeFilter which contains
  * the runtime filter definition and its corresponding data.
  */
-public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+public class RuntimeFilterWritable implements AutoCloseables.Closeable{
 
   private BitData.RuntimeFilterBDef runtimeFilterBDef;
 
@@ -81,6 +82,37 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable {
     }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+    int len = data.length;
+    DrillBuf[] cloned = new DrillBuf[len];
+    int i = 0;
+    for (DrillBuf src : data) {
+      int capacity = src.readableBytes();
+      DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+      int readerIndex = src.readerIndex();
+      src.readBytes(duplicateOne, 0, capacity);
+      src.readerIndex(readerIndex);
+      cloned[i] = duplicateOne;
+      i++;
+    }
+    return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+  }
+
+  public boolean same(RuntimeFilterWritable other) {
+    BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
+    int otherMajorId = runtimeFilterDef.getMajorFragmentId();
+    int otherMinorId = runtimeFilterDef.getMinorFragmentId();
+    int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
+    int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
+    int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
+    int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
+    return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+  }
+
+  public String toString() {
+    return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+  }
+
   @Override
   public void close() {
     for (DrillBuf buf : data) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 634e832..42b76f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,7 +62,6 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.filter.RuntimeFilterManager;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
@@ -122,7 +122,7 @@ public class Foreman implements Runnable {
 
   private String queryText;
 
-  private RuntimeFilterManager runtimeFilterManager;
+  private RuntimeFilterRouter runtimeFilterRouter;
   private boolean enableRuntimeFilter;
 
   /**
@@ -410,8 +410,8 @@ public class Foreman implements Runnable {
     queryRM.visitAbstractPlan(plan);
     final QueryWorkUnit work = getQueryWorkUnit(plan);
     if (enableRuntimeFilter) {
-      runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext);
-      runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo();
+      runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
+      runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
     }
     if (textPlan != null) {
       queryManager.setPlanText(textPlan.value);
@@ -734,8 +734,8 @@ public class Foreman implements Runnable {
 
       logger.debug(queryIdString + ": cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
-      if (enableRuntimeFilter && runtimeFilterManager != null) {
-        runtimeFilterManager.waitForComplete();
+      if (enableRuntimeFilter && runtimeFilterRouter != null) {
+        runtimeFilterRouter.waitForComplete();
       }
       // remove the channel disconnected listener (doesn't throw)
       closeFuture.removeListener(closeListener);
@@ -866,8 +866,8 @@ public class Foreman implements Runnable {
   }
 
 
-  public RuntimeFilterManager getRuntimeFilterManager() {
-    return runtimeFilterManager;
+  public RuntimeFilterRouter getRuntimeFilterRouter() {
+    return runtimeFilterRouter;
   }
 
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index f867015..81d0d1a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -180,6 +181,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
 
     private ExecutorState executorState = new OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -195,6 +197,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
+      this.runtimeFilterSink = new RuntimeFilterSink(allocator);
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
@@ -315,13 +318,13 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return null;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 1c4779c..559f7f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -203,11 +204,12 @@ public class PhysicalOpUnitTestBase extends ExecTest {
    * </p>
    */
   protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
-    private RuntimeFilterWritable runtimeFilterWritable;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
+      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
     }
 
     @Override
@@ -304,13 +306,13 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      this.runtimeFilterWritable = runtimeFilter;
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      this.runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return runtimeFilterWritable;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
   }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 20abb3b..d7921fc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2518,6 +2518,24 @@ public final class BitData {
      */
     com.google.protobuf.ByteString
         getProbeFieldsBytes(int index);
+
+    // optional int32 hj_op_id = 7;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    boolean hasHjOpId();
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    int getHjOpId();
   }
   /**
    * Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2627,6 +2645,11 @@ public final class BitData {
               probeFields_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000010;
+              hjOpId_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2820,6 +2843,30 @@ public final class BitData {
       return probeFields_.getByteString(index);
     }
 
+    // optional int32 hj_op_id = 7;
+    public static final int HJ_OP_ID_FIELD_NUMBER = 7;
+    private int hjOpId_;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public boolean hasHjOpId() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public int getHjOpId() {
+      return hjOpId_;
+    }
+
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
@@ -2827,6 +2874,7 @@ public final class BitData {
       toForeman_ = false;
       bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
       probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      hjOpId_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2858,6 +2906,9 @@ public final class BitData {
       for (int i = 0; i < probeFields_.size(); i++) {
         output.writeBytes(6, probeFields_.getByteString(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt32(7, hjOpId_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2901,6 +2952,10 @@ public final class BitData {
         size += dataSize;
         size += 1 * getProbeFieldsList().size();
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, hjOpId_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3034,6 +3089,8 @@ public final class BitData {
         bitField0_ = (bitField0_ & ~0x00000010);
         probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000020);
+        hjOpId_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -3093,6 +3150,10 @@ public final class BitData {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.probeFields_ = probeFields_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.hjOpId_ = hjOpId_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3141,6 +3202,9 @@ public final class BitData {
           }
           onChanged();
         }
+        if (other.hasHjOpId()) {
+          setHjOpId(other.getHjOpId());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3595,6 +3659,55 @@ public final class BitData {
         return this;
       }
 
+      // optional int32 hj_op_id = 7;
+      private int hjOpId_ ;
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public boolean hasHjOpId() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public int getHjOpId() {
+        return hjOpId_;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder setHjOpId(int value) {
+        bitField0_ |= 0x00000040;
+        hjOpId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder clearHjOpId() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        hjOpId_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
     }
 
@@ -3648,16 +3761,16 @@ public final class BitData {
       " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
       "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
       "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
-      "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" +
+      "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
       "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
       "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
       "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
       "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
-      "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" +
-      "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020",
-      "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" +
-      "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" +
-      "ataH\001"
+      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
+      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
+      "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
+      "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
+      "l.exec.protoB\007BitDataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3687,7 +3800,7 @@ public final class BitData {
           internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
           return null;
         }
       };
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 6aa54dd..3c88ffc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -441,6 +441,8 @@ public final class SchemaBitData
                     output.writeInt32(5, bloomFilterSizeInBytes, true);
                 for(String probeFields : message.getProbeFieldsList())
                     output.writeString(6, probeFields, true);
+                if(message.hasHjOpId())
+                    output.writeInt32(7, message.getHjOpId(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
             {
@@ -499,6 +501,9 @@ public final class SchemaBitData
                         case 6:
                             builder.addProbeFields(input.readString());
                             break;
+                        case 7:
+                            builder.setHjOpId(input.readInt32());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -545,6 +550,7 @@ public final class SchemaBitData
                 case 4: return "toForeman";
                 case 5: return "bloomFilterSizeInBytes";
                 case 6: return "probeFields";
+                case 7: return "hjOpId";
                 default: return null;
             }
         }
@@ -562,6 +568,7 @@ public final class SchemaBitData
             fieldMap.put("toForeman", 4);
             fieldMap.put("bloomFilterSizeInBytes", 5);
             fieldMap.put("probeFields", 6);
+            fieldMap.put("hjOpId", 7);
         }
     }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 99f3c78..2d1c2a7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -55,6 +55,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
     private Boolean toForeman;
     private List<Integer> bloomFilterSizeInBytes;
     private List<String> probeFields;
+    private int hjOpId;
 
     public RuntimeFilterBDef()
     {
@@ -141,6 +142,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         return this;
     }
 
+    // hjOpId
+
+    public int getHjOpId()
+    {
+        return hjOpId;
+    }
+
+    public RuntimeFilterBDef setHjOpId(int hjOpId)
+    {
+        this.hjOpId = hjOpId;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -218,6 +232,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
                         message.probeFields = new ArrayList<String>();
                     message.probeFields.add(input.readString());
                     break;
+                case 7:
+                    message.hjOpId = input.readInt32();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -257,6 +274,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
                     output.writeString(6, probeFields, true);
             }
         }
+
+        if(message.hjOpId != 0)
+            output.writeInt32(7, message.hjOpId, false);
     }
 
     public String getFieldName(int number)
@@ -269,6 +289,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
             case 4: return "toForeman";
             case 5: return "bloomFilterSizeInBytes";
             case 6: return "probeFields";
+            case 7: return "hjOpId";
             default: return null;
         }
     }
@@ -288,6 +309,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         __fieldMap.put("toForeman", 4);
         __fieldMap.put("bloomFilterSizeInBytes", 5);
         __fieldMap.put("probeFields", 6);
+        __fieldMap.put("hjOpId", 7);
     }
     
 }
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 063efe4..15c7230 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -47,4 +47,5 @@ message RuntimeFilterBDef{
   optional bool to_foreman = 4; // true means sending to foreman,false means sending to scan nodes
   repeated int32 bloom_filter_size_in_bytes = 5;
   repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
+  optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
 }


[drill] 01/05: DRILL-6788: Intermittent unit test failure TestDrillbitResilience.failsWhenParsing: Query state should be FAILED (and not COMPLETED) closes #1499

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit eb802710804e87e3b185872e712e7a339823a791
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Wed Oct 10 09:19:33 2018 -0700

    DRILL-6788: Intermittent unit test failure TestDrillbitResilience.failsWhenParsing: Query state should be FAILED (and not COMPLETED)
    closes #1499
---
 .../java/org/apache/drill/exec/server/TestDrillbitResilience.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 9f772e4..c3238ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -742,8 +742,10 @@ public class TestDrillbitResilience extends DrillTest {
 
     final String exceptionDesc = "sql-parsing";
     final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
+    // Inject the failure twice since there can be retry after first failure introduced in DRILL-6762. Retry is because
+    // of version mismatch in local and remote function registry which syncs up lazily.
     final String controls = Controls.newBuilder()
-    .addException(DrillSqlWorker.class, exceptionDesc, exceptionClass)
+      .addException(DrillSqlWorker.class, exceptionDesc, exceptionClass, 0, 2)
       .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);