You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:42 UTC

[drill] 03/05: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 216b1237739935b04c4f54b3f6f05371a4644085
Author: weijie.tong <we...@alipay.com>
AuthorDate: Thu Sep 6 19:23:35 2018 +0800

    DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
---
 .../org/apache/drill/exec/ops/FragmentContext.java |   8 +-
 .../apache/drill/exec/ops/FragmentContextImpl.java |  16 +-
 .../impl/filter/RuntimeFilterRecordBatch.java      |  43 ++++--
 .../exec/physical/impl/join/HashJoinBatch.java     |   5 +-
 .../physical/visitor/RuntimeFilterVisitor.java     |  24 ++-
 .../org/apache/drill/exec/work/WorkManager.java    |   4 +-
 .../exec/work/filter/RuntimeFilterReporter.java    |   5 +-
 ...FilterManager.java => RuntimeFilterRouter.java} |  60 ++------
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 171 +++++++++++++++++++++
 .../exec/work/filter/RuntimeFilterWritable.java    |  34 +++-
 .../apache/drill/exec/work/foreman/Foreman.java    |  16 +-
 .../org/apache/drill/test/OperatorFixture.java     |  11 +-
 .../apache/drill/test/PhysicalOpUnitTestBase.java  |  12 +-
 .../java/org/apache/drill/exec/proto/BitData.java  | 127 ++++++++++++++-
 .../org/apache/drill/exec/proto/SchemaBitData.java |   7 +
 .../drill/exec/proto/beans/RuntimeFilterBDef.java  |  22 +++
 protocol/src/main/protobuf/BitData.proto           |   1 +
 17 files changed, 448 insertions(+), 118 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c..88c21d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
   void close();
 
   /**
-   * Return null ,if setRuntimeFilter not being called
    * @return
    */
-  RuntimeFilterWritable getRuntimeFilter();
+  RuntimeFilterSink getRuntimeFilterSink();
 
   /**
-   * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
+   * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
    * @param runtimeFilter
    */
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
   interface ExecutorState {
     /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a898078..1f9d489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
 
-  private RuntimeFilterWritable runtimeFilterWritable;
+  private RuntimeFilterSink runtimeFilterSink;
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
+    this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
   }
 
   /**
@@ -348,13 +350,13 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   }
 
   @Override
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-    this.runtimeFilterWritable = runtimeFilter;
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+    this.runtimeFilterSink.aggregate(runtimeFilter);
   }
 
   @Override
-  public RuntimeFilterWritable getRuntimeFilter() {
-    return runtimeFilterWritable;
+  public RuntimeFilterSink getRuntimeFilterSink() {
+    return runtimeFilterSink;
   }
 
   /**
@@ -470,8 +472,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    if (runtimeFilterWritable != null) {
-      suppressingClose(runtimeFilterWritable);
+    if (runtimeFilterSink != null) {
+      suppressingClose(runtimeFilterSink);
     }
     suppressingClose(bufferManager);
     suppressingClose(allocator);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580..9248bbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -36,7 +36,9 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -56,6 +58,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
   private Map<String, Integer> field2id = new HashMap<>();
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
+  private RuntimeFilterWritable current;
+  private RuntimeFilterWritable previous;
   private int originalRecordCount;
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
@@ -102,6 +106,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.clear();
     }
     super.close();
+    if (current != null) {
+      current.close();
+    }
   }
 
   @Override
@@ -148,30 +155,36 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
    * schema change hash64 should be reset and this method needs to be called again.
    */
   private void setupHashHelper() {
-    final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
-
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     // Check if RuntimeFilterWritable was received by the minor fragment or not
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       return;
     }
-
-    // Check if bloomFilters is initialized or not
-    if (bloomFilters == null) {
-      bloomFilters = runtimeFilterWritable.unwrap();
+    if (runtimeFilterSink.hasFreshOne()) {
+      RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
+      if (current == null) {
+        current = freshRuntimeFilterWritable;
+        previous = freshRuntimeFilterWritable;
+      } else {
+        previous = current;
+        current = freshRuntimeFilterWritable;
+        previous.close();
+      }
+      bloomFilters = current.unwrap();
     }
-
     // Check if HashHelper is initialized or not
     if (hash64 == null) {
       ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context);
       try {
         //generate hash helper
-        this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+        this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList();
         List<LogicalExpression> hashFieldExps = new ArrayList<>();
         List<TypedFieldId> typedFieldIds = new ArrayList<>();
         for (String toFilterField : toFilterFields) {
           SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
           TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
-          this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+          int[] fieldIds = typedFieldId.getFieldIds();
+          this.field2id.put(toFilterField, fieldIds[0]);
           typedFieldIds.add(typedFieldId);
           ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
@@ -195,11 +208,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.setRecordCount(0);
       return;
     }
-
-    final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     sv2.allocateNew(originalRecordCount);
-
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       // means none of the rows are filtered out hence set all the indexes
       for (int i = 0; i < originalRecordCount; ++i) {
         sv2.setIndex(i, i);
@@ -207,10 +218,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
       sv2.setRecordCount(originalRecordCount);
       return;
     }
-
-    // Setup a hash helper if need be
+    // Setup a hash helper if needed
     setupHashHelper();
-
     //To make each independent bloom filter work together to construct a final filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
     for (int i = 0; i < toFilterFields.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 658f03a..3d45696 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -724,7 +724,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
-    //RuntimeFilterManager's judgement will have the RuntimeFilterDef.
+    //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
     if (runtimeFilterDef != null) {
       List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -944,7 +944,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
     if (cycleNum == 0 && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman());
+        int hashJoinOpId = this.popConfig.getOperatorId();
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
       }
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index c31e491..bfba5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -197,11 +197,8 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
     @Override
     public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException {
       if (holder != null) {
-        boolean broadcastExchange = exchange instanceof BroadcastExchangePrel;
         if (holder.isFromBuildSide()) {
-          //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark
-          //this HashJoin as BroadcastHashJoin
-          holder.setEncounteredBroadcastExchange(broadcastExchange);
+          holder.setBuildSideExchange(exchange);
         }
       }
       return visitPrel(exchange, holder);
@@ -224,15 +221,15 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
           Prel right = (Prel) hashJoinPrel.getRight();
           holder.setFromBuildSide(true);
           right.accept(this, holder);
-          boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange();
-          if (buildSideEncountererdBroadcastExchange) {
+          boolean routeToForeman = holder.needToRouteToForeman();
+          if (!routeToForeman) {
             runtimeFilterDef.setSendToForeman(false);
           } else {
             runtimeFilterDef.setSendToForeman(true);
           }
           List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
           for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-            if (buildSideEncountererdBroadcastExchange) {
+            if (!routeToForeman) {
               bloomFilterDef.setLocal(true);
             } else {
               bloomFilterDef.setLocal(false);
@@ -338,18 +335,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
    * RuntimeFilter helper util holder
    */
   private static class RFHelperHolder {
-    //whether this join operator is a partitioned HashJoin or broadcast HashJoin,
-    //also single node HashJoin is not expected to do JPPD.
-    private boolean encounteredBroadcastExchange;
 
     private boolean fromBuildSide;
 
-    public boolean isEncounteredBroadcastExchange() {
-      return encounteredBroadcastExchange;
+    private ExchangePrel exchangePrel;
+
+    public void setBuildSideExchange(ExchangePrel exchange){
+      this.exchangePrel = exchange;
     }
 
-    public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) {
-      this.encounteredBroadcastExchange = encounteredBroadcastExchange;
+    public boolean needToRouteToForeman() {
+      return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel);
     }
 
     public boolean isFromBuildSide() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bf91ed3..0d97e0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -395,7 +395,7 @@ public class WorkManager implements AutoCloseable {
               final String originalName = currentThread.getName();
               currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
               try {
-                foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter);
+                foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
               } catch (Exception e) {
                 logger.warn("Exception while registering the RuntimeFilter", e);
               } finally {
@@ -413,7 +413,7 @@ public class WorkManager implements AutoCloseable {
           .setQueryId(queryId).build();
         FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle);
         if (fragmentExecutor != null) {
-          fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter);
+          fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index e6ede7a..6e4a9a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,7 @@ public class RuntimeFilterReporter {
     this.context = context;
   }
 
-  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) {
+  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
     ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
     DrillBuf[] data = new DrillBuf[bloomFilters.size()];
     List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -63,6 +63,7 @@ public class RuntimeFilterReporter {
       .setMajorFragmentId(majorFragmentId)
       .setMinorFragmentId(minorFragmentId)
       .setToForeman(sendToForeman)
+      .setHjOpId(hashJoinOpId)
       .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
       .build();
     RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
@@ -72,7 +73,7 @@ public class RuntimeFilterReporter {
       AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint);
       dataTunnel.sendRuntimeFilter(runtimeFilterWritable);
     } else {
-      context.setRuntimeFilter(runtimeFilterWritable);
+      context.addRuntimeFilter(runtimeFilterWritable);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
similarity index 87%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index e3f89a6..5a8c6fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.work.filter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.Consumer;
@@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.BitData;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
@@ -60,17 +59,14 @@ import java.util.concurrent.ConcurrentHashMap;
  * The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
  * To Partitioned case:
  * The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream
+ * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the
+ * scanned rows to generate the SV2.
  * To Broadcast case:
  * The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the
  * RuntimeFilterRecordBath is the same as the Partitioned one.
- *
- *
- *
  */
-public class RuntimeFilterManager {
+public class RuntimeFilterRouter {
 
   private Wrapper rootWrapper;
   //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
@@ -79,14 +75,12 @@ public class RuntimeFilterManager {
   private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
   //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
   private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
-  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>();
 
   private DrillbitContext drillbitContext;
 
   private SendingAccountor sendingAccountor = new SendingAccountor();
 
-  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
 
   /**
    * This class maintains context for the runtime join push down's filter management. It
@@ -95,7 +89,7 @@ public class RuntimeFilterManager {
    * @param workUnit
    * @param drillbitContext
    */
-  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
+  public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
     this.rootWrapper = workUnit.getRootWrapper();
     this.drillbitContext = drillbitContext;
   }
@@ -134,32 +128,16 @@ public class RuntimeFilterManager {
    * @param runtimeFilterWritable
    */
   public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
-    BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef();
-    int majorId = runtimeFilterB.getMajorFragmentId();
-    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
-    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
-    logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-    int size;
-    synchronized (this) {
-      size = joinMjId2scanSize.get(majorId);
-      Preconditions.checkState(size > 0);
-      RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId);
-      if (aggregatedRuntimeFilter == null) {
-        aggregatedRuntimeFilter = runtimeFilterWritable;
-      } else {
-        aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-      }
-      joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
-      size--;
-      joinMjId2scanSize.put(majorId, size);
-    }
-    if (size == 0) {
-      broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
-    }
+    broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
   }
 
 
-  private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) {
+  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+    DrillBuf[] data = srcRuntimeFilterWritable.getData();
     List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
     int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
     for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
@@ -172,10 +150,8 @@ public class RuntimeFilterManager {
         .setMajorFragmentId(scanNodeMjId)
         .setMinorFragmentId(minorId)
         .build();
-      RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId);
-      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData());
+      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
       CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-
       DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
       Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
         @Override
@@ -235,8 +211,6 @@ public class RuntimeFilterManager {
 
   private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
 
-    private PhysicalOperator targetOp;
-
     private Fragment fragment;
 
     private boolean contain = false;
@@ -251,7 +225,6 @@ public class RuntimeFilterManager {
 
 
     public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
-      this.targetOp = targetOp;
       this.fragment = fragment;
       this.targetIsGroupScan = targetOp instanceof GroupScan;
       this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
@@ -343,13 +316,10 @@ public class RuntimeFilterManager {
 
     private int probeSideScanMajorId;
 
-
-
     private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
 
     private RuntimeFilterDef runtimeFilterDef;
 
-
     public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
       return probeSideScanEndpoints;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
new file mode 100644
index 0000000..8f4c823
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
+
+  private AtomicInteger currentBookId = new AtomicInteger(0);
+
+  private int staleBookId = 0;
+
+  private RuntimeFilterWritable aggregated = null;
+
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+
+  private AtomicBoolean running = new AtomicBoolean(true);
+
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+    this.bufferAllocator = bufferAllocator;
+    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+    asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+    asyncAggregateThread.start();
+  }
+
+  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+    if (running.get()) {
+      if (containOne()) {
+        boolean same = aggregated.same(runtimeFilterWritable);
+        if (!same) {
+          //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+          //share the same FragmentContext.
+          try {
+            aggregatedRFLock.lock();
+            aggregated.close();
+            aggregated = null;
+          } finally {
+            aggregatedRFLock.unlock();
+          }
+          currentBookId.set(0);
+          staleBookId = 0;
+          clearQueued();
+        }
+      }
+      rfQueue.add(runtimeFilterWritable);
+    } else {
+      runtimeFilterWritable.close();
+    }
+  }
+
+  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+    try {
+      aggregatedRFLock.lock();
+      return aggregated.duplicate(bufferAllocator);
+    } finally {
+      aggregatedRFLock.unlock();
+    }
+  }
+
+  /**
+   * whether there's a fresh aggregated RuntimeFilter
+   *
+   * @return
+   */
+  public boolean hasFreshOne() {
+    if (currentBookId.get() > staleBookId) {
+      staleBookId = currentBookId.get();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * whether there's a usable RuntimeFilter.
+   *
+   * @return
+   */
+  public boolean containOne() {
+    return aggregated != null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    running.compareAndSet(true, false);
+    asyncAggregateThread.interrupt();
+    if (containOne()) {
+      try {
+        aggregatedRFLock.lock();
+        aggregated.close();
+      } finally {
+        aggregatedRFLock.unlock();
+      }
+    }
+    clearQueued();
+  }
+
+  private void clearQueued() {
+    RuntimeFilterWritable toClear;
+    while ((toClear = rfQueue.poll()) != null) {
+      toClear.close();
+    }
+  }
+
+  class AsyncAggregateWorker implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        while (running.get()) {
+          RuntimeFilterWritable toAggregate = rfQueue.take();
+          if (!running.get()) {
+            toAggregate.close();
+            return;
+          }
+          if (containOne()) {
+            try {
+              aggregatedRFLock.lock();
+              aggregated.aggregate(toAggregate);
+            } finally {
+              aggregatedRFLock.unlock();
+            }
+          } else {
+            aggregated = toAggregate;
+          }
+          currentBookId.incrementAndGet();
+        }
+      } catch (InterruptedException e) {
+        logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+      }
+    }
+  }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 8649e15..302a480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.filter;
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
 
 import java.util.ArrayList;
@@ -29,7 +30,7 @@ import java.util.List;
  * A binary wire transferable representation of the RuntimeFilter which contains
  * the runtime filter definition and its corresponding data.
  */
-public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+public class RuntimeFilterWritable implements AutoCloseables.Closeable{
 
   private BitData.RuntimeFilterBDef runtimeFilterBDef;
 
@@ -81,6 +82,37 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable {
     }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+    int len = data.length;
+    DrillBuf[] cloned = new DrillBuf[len];
+    int i = 0;
+    for (DrillBuf src : data) {
+      int capacity = src.readableBytes();
+      DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+      int readerIndex = src.readerIndex();
+      src.readBytes(duplicateOne, 0, capacity);
+      src.readerIndex(readerIndex);
+      cloned[i] = duplicateOne;
+      i++;
+    }
+    return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+  }
+
+  public boolean same(RuntimeFilterWritable other) {
+    BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
+    int otherMajorId = runtimeFilterDef.getMajorFragmentId();
+    int otherMinorId = runtimeFilterDef.getMinorFragmentId();
+    int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
+    int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
+    int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
+    int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
+    return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+  }
+
+  public String toString() {
+    return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+  }
+
   @Override
   public void close() {
     for (DrillBuf buf : data) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 634e832..42b76f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,7 +62,6 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.filter.RuntimeFilterManager;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
@@ -122,7 +122,7 @@ public class Foreman implements Runnable {
 
   private String queryText;
 
-  private RuntimeFilterManager runtimeFilterManager;
+  private RuntimeFilterRouter runtimeFilterRouter;
   private boolean enableRuntimeFilter;
 
   /**
@@ -410,8 +410,8 @@ public class Foreman implements Runnable {
     queryRM.visitAbstractPlan(plan);
     final QueryWorkUnit work = getQueryWorkUnit(plan);
     if (enableRuntimeFilter) {
-      runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext);
-      runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo();
+      runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
+      runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
     }
     if (textPlan != null) {
       queryManager.setPlanText(textPlan.value);
@@ -734,8 +734,8 @@ public class Foreman implements Runnable {
 
       logger.debug(queryIdString + ": cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
-      if (enableRuntimeFilter && runtimeFilterManager != null) {
-        runtimeFilterManager.waitForComplete();
+      if (enableRuntimeFilter && runtimeFilterRouter != null) {
+        runtimeFilterRouter.waitForComplete();
       }
       // remove the channel disconnected listener (doesn't throw)
       closeFuture.removeListener(closeListener);
@@ -866,8 +866,8 @@ public class Foreman implements Runnable {
   }
 
 
-  public RuntimeFilterManager getRuntimeFilterManager() {
-    return runtimeFilterManager;
+  public RuntimeFilterRouter getRuntimeFilterRouter() {
+    return runtimeFilterRouter;
   }
 
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index f867015..81d0d1a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -180,6 +181,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
 
     private ExecutorState executorState = new OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -195,6 +197,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
+      this.runtimeFilterSink = new RuntimeFilterSink(allocator);
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
@@ -315,13 +318,13 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return null;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 1c4779c..559f7f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -203,11 +204,12 @@ public class PhysicalOpUnitTestBase extends ExecTest {
    * </p>
    */
   protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
-    private RuntimeFilterWritable runtimeFilterWritable;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
+      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
     }
 
     @Override
@@ -304,13 +306,13 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      this.runtimeFilterWritable = runtimeFilter;
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      this.runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return runtimeFilterWritable;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
   }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 20abb3b..d7921fc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2518,6 +2518,24 @@ public final class BitData {
      */
     com.google.protobuf.ByteString
         getProbeFieldsBytes(int index);
+
+    // optional int32 hj_op_id = 7;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    boolean hasHjOpId();
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    int getHjOpId();
   }
   /**
    * Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2627,6 +2645,11 @@ public final class BitData {
               probeFields_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000010;
+              hjOpId_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2820,6 +2843,30 @@ public final class BitData {
       return probeFields_.getByteString(index);
     }
 
+    // optional int32 hj_op_id = 7;
+    public static final int HJ_OP_ID_FIELD_NUMBER = 7;
+    private int hjOpId_;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public boolean hasHjOpId() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public int getHjOpId() {
+      return hjOpId_;
+    }
+
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
@@ -2827,6 +2874,7 @@ public final class BitData {
       toForeman_ = false;
       bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
       probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      hjOpId_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2858,6 +2906,9 @@ public final class BitData {
       for (int i = 0; i < probeFields_.size(); i++) {
         output.writeBytes(6, probeFields_.getByteString(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt32(7, hjOpId_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2901,6 +2952,10 @@ public final class BitData {
         size += dataSize;
         size += 1 * getProbeFieldsList().size();
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, hjOpId_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3034,6 +3089,8 @@ public final class BitData {
         bitField0_ = (bitField0_ & ~0x00000010);
         probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000020);
+        hjOpId_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -3093,6 +3150,10 @@ public final class BitData {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.probeFields_ = probeFields_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.hjOpId_ = hjOpId_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3141,6 +3202,9 @@ public final class BitData {
           }
           onChanged();
         }
+        if (other.hasHjOpId()) {
+          setHjOpId(other.getHjOpId());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3595,6 +3659,55 @@ public final class BitData {
         return this;
       }
 
+      // optional int32 hj_op_id = 7;
+      private int hjOpId_ ;
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public boolean hasHjOpId() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public int getHjOpId() {
+        return hjOpId_;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder setHjOpId(int value) {
+        bitField0_ |= 0x00000040;
+        hjOpId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder clearHjOpId() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        hjOpId_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
     }
 
@@ -3648,16 +3761,16 @@ public final class BitData {
       " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
       "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
       "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
-      "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" +
+      "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
       "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
       "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
       "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
       "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
-      "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" +
-      "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020",
-      "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" +
-      "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" +
-      "ataH\001"
+      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
+      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
+      "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
+      "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
+      "l.exec.protoB\007BitDataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3687,7 +3800,7 @@ public final class BitData {
           internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
           return null;
         }
       };
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 6aa54dd..3c88ffc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -441,6 +441,8 @@ public final class SchemaBitData
                     output.writeInt32(5, bloomFilterSizeInBytes, true);
                 for(String probeFields : message.getProbeFieldsList())
                     output.writeString(6, probeFields, true);
+                if(message.hasHjOpId())
+                    output.writeInt32(7, message.getHjOpId(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
             {
@@ -499,6 +501,9 @@ public final class SchemaBitData
                         case 6:
                             builder.addProbeFields(input.readString());
                             break;
+                        case 7:
+                            builder.setHjOpId(input.readInt32());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -545,6 +550,7 @@ public final class SchemaBitData
                 case 4: return "toForeman";
                 case 5: return "bloomFilterSizeInBytes";
                 case 6: return "probeFields";
+                case 7: return "hjOpId";
                 default: return null;
             }
         }
@@ -562,6 +568,7 @@ public final class SchemaBitData
             fieldMap.put("toForeman", 4);
             fieldMap.put("bloomFilterSizeInBytes", 5);
             fieldMap.put("probeFields", 6);
+            fieldMap.put("hjOpId", 7);
         }
     }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 99f3c78..2d1c2a7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -55,6 +55,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
     private Boolean toForeman;
     private List<Integer> bloomFilterSizeInBytes;
     private List<String> probeFields;
+    private int hjOpId;
 
     public RuntimeFilterBDef()
     {
@@ -141,6 +142,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         return this;
     }
 
+    // hjOpId
+
+    public int getHjOpId()
+    {
+        return hjOpId;
+    }
+
+    public RuntimeFilterBDef setHjOpId(int hjOpId)
+    {
+        this.hjOpId = hjOpId;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -218,6 +232,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
                         message.probeFields = new ArrayList<String>();
                     message.probeFields.add(input.readString());
                     break;
+                case 7:
+                    message.hjOpId = input.readInt32();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -257,6 +274,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
                     output.writeString(6, probeFields, true);
             }
         }
+
+        if(message.hjOpId != 0)
+            output.writeInt32(7, message.hjOpId, false);
     }
 
     public String getFieldName(int number)
@@ -269,6 +289,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
             case 4: return "toForeman";
             case 5: return "bloomFilterSizeInBytes";
             case 6: return "probeFields";
+            case 7: return "hjOpId";
             default: return null;
         }
     }
@@ -288,6 +309,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
         __fieldMap.put("toForeman", 4);
         __fieldMap.put("bloomFilterSizeInBytes", 5);
         __fieldMap.put("probeFields", 6);
+        __fieldMap.put("hjOpId", 7);
     }
     
 }
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 063efe4..15c7230 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -47,4 +47,5 @@ message RuntimeFilterBDef{
   optional bool to_foreman = 4; // true means sending to foreman,false means sending to scan nodes
   repeated int32 bloom_filter_size_in_bytes = 5;
   repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
+  optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
 }