You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/29 20:27:30 UTC

[GitHub] asfgit closed pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…

asfgit closed pull request #1504: DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index fb465725906..c4d7652f3f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -153,6 +153,10 @@ private ExecConstants() {
   public static final IntegerValidator HASHJOIN_BLOOM_FILTER_MAX_SIZE = new IntegerValidator(HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY, null);
   public static final String HASHJOIN_BLOOM_FILTER_FPP_KEY = "exec.hashjoin.bloom_filter.fpp";
   public static final DoubleValidator HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR = new RangeDoubleValidator(HASHJOIN_BLOOM_FILTER_FPP_KEY, Double.MIN_VALUE, 1.0, null);
+  public static final String HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY = "exec.hashjoin.runtime_filter.waiting.enable";
+  public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
+  public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time";
+  public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
 
 
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 88c21d9e957..5125f720192 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -20,8 +20,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
+import java.util.concurrent.TimeUnit;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -159,18 +158,23 @@ BufferAllocator getNewChildAllocator(final String operatorName,
 
   @Override
   void close();
-
-  /**
-   * @return
-   */
-  RuntimeFilterSink getRuntimeFilterSink();
-
   /**
    * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
    * @param runtimeFilter
    */
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier);
+
+  /**
+   * get the RuntimeFilter with a blocking wait, if the waiting option is enabled
+   * @param rfIdentifier
+   * @param maxWaitTime
+   * @param timeUnit
+   * @return the RFW or null
+   */
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit);
+
   interface ExecutorState {
     /**
      * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 6e40466e8e5..b740c927daf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -21,7 +21,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -60,8 +65,6 @@
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -115,6 +118,10 @@
   private final BufferManager bufferManager;
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
+  private boolean enableRuntimeFilter;
+  private boolean enableRFWaiting;
+  private Lock lock4RF;
+  private Condition condition4RF;
 
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) {
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
+  private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -209,10 +216,11 @@ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment f
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
-    boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
-    if (enableRF) {
-      ExecutorService executorService = context.getExecutor();
-      this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+    enableRuntimeFilter = this.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
+    enableRFWaiting = this.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val && enableRuntimeFilter;
+    if (enableRFWaiting) {
+      lock4RF = new ReentrantLock();
+      condition4RF = lock4RF.newCondition();
     }
   }
 
@@ -362,12 +370,50 @@ public boolean isUserAuthenticationEnabled() {
 
   @Override
   public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-    this.runtimeFilterSink.aggregate(runtimeFilter);
+    long rfIdentifier = runtimeFilter.getRuntimeFilterBDef().getRfIdentifier();
+    //if the RF was sent directly from the HJ nodes, we don't need to retain the buffer again
+    // as the RuntimeFilterReporter has already retained the buffer
+    rfIdentifier2fetched.put(rfIdentifier, false);
+    rfIdentifier2RFW.put(rfIdentifier, runtimeFilter);
+    if (enableRFWaiting) {
+      lock4RF.lock();
+      try {
+        condition4RF.signal();
+      } catch (Exception e) {
+        logger.info("fail to signal the waiting thread.", e);
+      } finally {
+        lock4RF.unlock();
+      }
+    }
+  }
+
+  @Override
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+    RuntimeFilterWritable runtimeFilterWritable = rfIdentifier2RFW.get(rfIdentifier);
+    if (runtimeFilterWritable != null) {
+      rfIdentifier2fetched.put(rfIdentifier, true);
+    }
+    return runtimeFilterWritable;
   }
 
   @Override
-  public RuntimeFilterSink getRuntimeFilterSink() {
-    return runtimeFilterSink;
+  public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit) {
+    if (rfIdentifier2RFW.get(rfIdentifier) != null) {
+      return getRuntimeFilter(rfIdentifier);
+    }
+    if (enableRFWaiting) {
+      lock4RF.lock();
+      try {
+        if (rfIdentifier2RFW.get(rfIdentifier) == null) {
+          condition4RF.await(maxWaitTime, timeUnit);
+        }
+      } catch (InterruptedException e) {
+        logger.info("Condition was interrupted", e);
+      } finally {
+        lock4RF.unlock();
+      }
+    }
+    return getRuntimeFilter(rfIdentifier);
   }
 
   /**
@@ -484,12 +530,11 @@ public void close() {
     // Close the buffers before closing the operators; this is needed as buffer ownership
     // is attached to the receive operators.
     suppressingClose(buffers);
-
+    closeNotConsumedRFWs();
     // close operator context
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    suppressingClose(runtimeFilterSink);
     suppressingClose(bufferManager);
     suppressingClose(allocator);
   }
@@ -550,4 +595,15 @@ public boolean isBuffersDone() {
   protected BufferManager getBufferManager() {
     return bufferManager;
   }
+
+  private void closeNotConsumedRFWs() {
+    for (RuntimeFilterWritable runtimeFilterWritable : rfIdentifier2RFW.values()){
+      long rfIdentifier = runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier();
+      boolean fetchedByOperator = rfIdentifier2fetched.get(rfIdentifier);
+      if (!fetchedByOperator) {
+        //if the RF hasn't been consumed by the operator, we have to released it one more time.
+        runtimeFilterWritable.close();
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index ac867020de1..da590689d18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -21,6 +21,7 @@
 import org.apache.drill.exec.physical.impl.SingleSenderCreator;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec;
+import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
 import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
 import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
@@ -61,6 +62,7 @@
     register(CoreOperatorType.LATERAL_JOIN_VALUE, AbstractBinaryRecordBatch.Metric.class);
     register(CoreOperatorType.UNNEST_VALUE, UnnestRecordBatch.Metric.class);
     register(CoreOperatorType.UNION_VALUE, AbstractBinaryRecordBatch.Metric.class);
+    register(CoreOperatorType.RUNTIME_FILTER_VALUE, RuntimeFilterRecordBatch.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
index 50c00d792aa..b35bf29afe1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RuntimeFilterPOP.java
@@ -31,9 +31,12 @@
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPOP.class);
 
+  private long identifier;
+
   @JsonCreator
-  public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child) {
+  public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child, @JsonProperty("identifier")long identifier) {
     super(child);
+    this.identifier = identifier;
   }
 
   @Override
@@ -43,7 +46,7 @@ public RuntimeFilterPOP(@JsonProperty("child") PhysicalOperator child) {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new RuntimeFilterPOP(child);
+    return new RuntimeFilterPOP(child, identifier);
   }
 
   @Override
@@ -55,4 +58,13 @@ public SelectionVectorMode getSVMode() {
   public int getOperatorType() {
     return CoreOperatorType.RUNTIME_FILTER_VALUE;
   }
+
+
+  public long getIdentifier() {
+    return identifier;
+  }
+
+  public void setIdentifier(long identifier) {
+    this.identifier = identifier;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index 9248bbc698e..bf7ed79121d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -22,11 +22,13 @@
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -36,14 +38,13 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates
@@ -59,12 +60,21 @@
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
   private RuntimeFilterWritable current;
-  private RuntimeFilterWritable previous;
   private int originalRecordCount;
+  private long filteredRows = 0l;
+  private long appliedTimes = 0l;
+  private int batchTimes = 0;
+  private boolean waited = false;
+  private boolean enableRFWaiting;
+  private long maxWaitingTime;
+  private long rfIdentifier;
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
   public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
+    enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
+    maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
+    this.rfIdentifier = pop.getIdentifier();
   }
 
   @Override
@@ -89,7 +99,6 @@ public SelectionVector4 getSelectionVector4() {
 
   @Override
   protected IterOutcome doWork() {
-    container.transferIn(incoming.getContainer());
     originalRecordCount = incoming.getRecordCount();
     sv2.setBatchActualRecordCount(originalRecordCount);
     try {
@@ -97,6 +106,8 @@ protected IterOutcome doWork() {
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException(e);
     }
+    container.transferIn(incoming.getContainer());
+    updateStats();
     return getFinalOutcome(false);
   }
 
@@ -155,21 +166,11 @@ protected boolean setupNewSchema() throws SchemaChangeException {
    * schema change hash64 should be reset and this method needs to be called again.
    */
   private void setupHashHelper() {
-    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
-    // Check if RuntimeFilterWritable was received by the minor fragment or not
-    if (!runtimeFilterSink.containOne()) {
+    current = context.getRuntimeFilter(rfIdentifier);
+    if (current == null) {
       return;
     }
-    if (runtimeFilterSink.hasFreshOne()) {
-      RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
-      if (current == null) {
-        current = freshRuntimeFilterWritable;
-        previous = freshRuntimeFilterWritable;
-      } else {
-        previous = current;
-        current = freshRuntimeFilterWritable;
-        previous.close();
-      }
+    if (bloomFilters == null) {
       bloomFilters = current.unwrap();
     }
     // Check if HashHelper is initialized or not
@@ -189,8 +190,7 @@ private void setupHashHelper() {
           ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
         }
-        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]),
-          typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
+        hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()]));
       } catch (Exception e) {
         throw UserException.internalError(e).build(logger);
       }
@@ -208,9 +208,11 @@ private void applyRuntimeFilter() throws SchemaChangeException {
       sv2.setRecordCount(0);
       return;
     }
-    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
+    current = context.getRuntimeFilter(rfIdentifier);
+    timedWaiting();
+    batchTimes++;
     sv2.allocateNew(originalRecordCount);
-    if (!runtimeFilterSink.containOne()) {
+    if (current == null) {
       // means none of the rows are filtered out hence set all the indexes
       for (int i = 0; i < originalRecordCount; ++i) {
         sv2.setIndex(i, i);
@@ -227,21 +229,17 @@ private void applyRuntimeFilter() throws SchemaChangeException {
       String fieldName = toFilterFields.get(i);
       computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
     }
-
     int svIndex = 0;
-    int tmpFilterRows = 0;
     for (int i = 0; i < originalRecordCount; i++) {
       boolean contain = bitSet.get(i);
       if (contain) {
         sv2.setIndex(svIndex, i);
         svIndex++;
       } else {
-        tmpFilterRows++;
+        filteredRows++;
       }
     }
-
-    logger.debug("RuntimeFiltered has filtered out {} rows from incoming with {} rows",
-      tmpFilterRows, originalRecordCount);
+    appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
 
@@ -263,4 +261,34 @@ public void dump() {
         + "originalRecordCount={}, batchSchema={}]",
         container, sv2, toFilterFields, originalRecordCount, incoming.getSchema());
   }
+
+  public enum Metric implements MetricDef {
+    FILTERED_ROWS, APPLIED_TIMES;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
+  public void updateStats() {
+    stats.setLongStat(Metric.FILTERED_ROWS, filteredRows);
+    stats.setLongStat(Metric.APPLIED_TIMES, appliedTimes);
+  }
+
+  private void timedWaiting() {
+    if (!enableRFWaiting || waited) {
+      return;
+    }
+    //Downstream HashJoinBatch prefetch first batch from both sides in buildSchema phase hence waiting is done post that phase
+    if (current == null && batchTimes > 0) {
+      waited = true;
+      try {
+        stats.startWait();
+        current = context.getRuntimeFilter(rfIdentifier, maxWaitingTime, TimeUnit.MILLISECONDS);
+      } finally {
+        stats.stopWait();
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 88eadf29115..0ac0809d8f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,10 +19,13 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -203,11 +206,13 @@
   private int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
   private int maxBatchesInMemory;
-  private List<BloomFilter> bloomFilters = new ArrayList<>();
   private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
   private boolean enableRuntimeFilter;
   private RuntimeFilterReporter runtimeFilterReporter;
   private ValueVectorHashHelper.Hash64 hash64;
+  private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+  private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+  private List<BloomFilter> bloomFilters = new ArrayList<>();
 
   /**
    * This holds information about the spilled partitions for the build and probe side.
@@ -757,6 +762,24 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException
       enableRuntimeFilter = false;
       return;
     }
+    RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+    List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+    for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+      String buildField = bloomFilterDef.getBuildField();
+      SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+      TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+      if (typedFieldId == null) {
+        missingField = true;
+        break;
+      }
+      int fieldId = typedFieldId.getFieldIds()[0];
+      bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+    }
+    if (missingField) {
+      logger.info("As some build side join key fields not found, runtime filter was disabled");
+      enableRuntimeFilter = false;
+      return;
+    }
     ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
     try {
       hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
@@ -799,9 +822,6 @@ private void initializeRuntimeFilter() {
     if (!enableRuntimeFilter) {
       return;
     }
-    if (runtimeFilterReporter != null) {
-      return;
-    }
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
@@ -809,11 +829,13 @@ private void initializeRuntimeFilter() {
     if (runtimeFilterDef != null) {
       List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+        int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
         int numBytes = bloomFilterDef.getNumBytes();
         String probeField =  bloomFilterDef.getProbeField();
         probeFields.add(probeField);
         BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
         bloomFilters.add(bloomFilter);
+        bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
   }
@@ -992,13 +1014,12 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
         //create runtime filter
         if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
-          int condFieldIndex = 0;
-          for (BloomFilter bloomFilter : bloomFilters) {
+          for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
+            int fieldId = bloomFilter2buildId.get(bloomFilter);
             for (int ind = 0; ind < currentRecordCount; ind++) {
-              long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+              long hashCode = hash64.hash64Code(ind, 0, fieldId);
               bloomFilter.insert(hashCode);
             }
-            condFieldIndex++;
           }
         }
 
@@ -1027,9 +1048,9 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
     }
 
     if (spilledState.isFirstCycle() && enableRuntimeFilter) {
-      if (bloomFilters.size() > 0) {
+      if (bloomFilter2buildId.size() > 0) {
         int hashJoinOpId = this.popConfig.getOperatorId();
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
       }
     }
 
@@ -1237,7 +1258,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
       "configured output batch size: %d", configuredBatchSize);
 
-    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+    enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
index 59e1622b501..1729027ded9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
@@ -27,25 +27,29 @@
 import java.io.IOException;
 import java.util.List;
 
-public class RuntimeFilterPrel extends SinglePrel{
+public class RuntimeFilterPrel extends SinglePrel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class);
 
-  public RuntimeFilterPrel(Prel child){
+  private long identifier;
+
+  public RuntimeFilterPrel(Prel child, long identifier){
     super(child.getCluster(), child.getTraitSet(), child);
+    this.identifier = identifier;
   }
 
-  public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+  public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) {
     super(cluster, traits, child);
+    this.identifier = identifier;
   }
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0));
+    return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier);
   }
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    RuntimeFilterPOP r =  new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator));
+    RuntimeFilterPOP r =  new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier);
     return creator.addMetadata(this, r);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index fcfa2bca1fc..4d309aea030 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical.visitor;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -28,7 +27,6 @@
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.physical.BroadcastExchangePrel;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.HashAggPrel;
@@ -43,11 +41,14 @@
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-
+import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This visitor does two major things:
@@ -58,9 +59,14 @@
 
   private Set<ScanPrel> toAddRuntimeFilter = new HashSet<>();
 
+  private Multimap<ScanPrel, HashJoinPrel> probeSideScan2hj = HashMultimap.create();
+
   private double fpp;
+
   private int bloomFilterMaxSizeInBytesDef;
 
+  private static final AtomicLong rfIdCounter = new AtomicLong();
+
   private RuntimeFilterVisitor(QueryContext queryContext) {
     this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue();
     this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val;
@@ -76,7 +82,7 @@ public static Prel addRuntimeFilter(Prel prel, QueryContext queryContext) {
   }
 
   public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
-    List<RelNode> children = Lists.newArrayList();
+    List<RelNode> children = new ArrayList<>();
     for (Prel child : prel) {
       child = child.accept(this, value);
       children.add(child);
@@ -100,8 +106,18 @@ public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
   @Override
   public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException {
     if (toAddRuntimeFilter.contains(prel)) {
-      //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node.
-      RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel);
+      //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node.
+      Collection<HashJoinPrel> hashJoinPrels = probeSideScan2hj.get(prel);
+      RuntimeFilterPrel runtimeFilterPrel = null;
+      for (HashJoinPrel hashJoinPrel : hashJoinPrels) {
+        long identifier = rfIdCounter.incrementAndGet();
+        hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier);
+        if (runtimeFilterPrel == null) {
+          runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier);
+        } else {
+          runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier);
+        }
+      }
       return runtimeFilterPrel;
     } else {
       return prel;
@@ -134,13 +150,24 @@ private RuntimeFilterDef generateRuntimeFilter(HashJoinPrel hashJoinPrel) {
 
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     //find the possible left scan node of the left join key
-    GroupScan groupScan = null;
+    ScanPrel probeSideScanPrel = null;
     RelNode left = hashJoinPrel.getLeft();
+    RelNode right = hashJoinPrel.getRight();
+    ExchangePrel exchangePrel = findRightExchangePrel(right);
+    if (exchangePrel == null) {
+      //Does not support the single fragment mode ,that is the right build side
+      //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+      return null;
+    }
     List<String> leftFields = left.getRowType().getFieldNames();
+    List<String> rightFields = right.getRowType().getFieldNames();
     List<Integer> leftKeys = hashJoinPrel.getLeftKeys();
     RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
+    int i = 0;
     for (Integer leftKey : leftKeys) {
       String leftFieldName = leftFields.get(leftKey);
+      String rightFieldName = rightFields.get(i);
+      i++;
       //This also avoids the left field of the join condition with a function call.
       ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left);
       if (scanPrel != null) {
@@ -160,17 +187,17 @@ private RuntimeFilterDef generateRuntimeFilter(HashJoinPrel hashJoinPrel) {
         int bloomFilterSizeInBytes = BloomFilter.optimalNumOfBytes(ndv.longValue(), fpp);
         bloomFilterSizeInBytes = bloomFilterSizeInBytes > bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes;
         //left the local parameter to be set later.
-        BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName);
+        BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName);
         bloomFilterDef.setLeftNDV(ndv);
         bloomFilterDefs.add(bloomFilterDef);
         toAddRuntimeFilter.add(scanPrel);
-        groupScan = scanPrel.getGroupScan();
+        probeSideScanPrel = scanPrel;
       }
     }
     if (bloomFilterDefs.size() > 0) {
       //left sendToForeman parameter to be set later.
-      RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false);
-      runtimeFilterDef.setProbeSideGroupScan(groupScan);
+      RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
+      probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel);
       return runtimeFilterDef;
     }
     return null;
@@ -265,6 +292,30 @@ private ScanPrel findLeftScanPrel(String fieldName, RelNode leftRelNode) {
     }
   }
 
+  private ExchangePrel findRightExchangePrel(RelNode rightRelNode) {
+    if (rightRelNode instanceof ExchangePrel) {
+      return (ExchangePrel) rightRelNode;
+    }
+    if (rightRelNode instanceof ScanPrel) {
+      return null;
+    } else if (rightRelNode instanceof RelSubset) {
+      RelNode bestNode = ((RelSubset) rightRelNode).getBest();
+      if (bestNode != null) {
+        return findRightExchangePrel(bestNode);
+      } else {
+        return null;
+      }
+    } else {
+      List<RelNode> relNodes = rightRelNode.getInputs();
+      if (relNodes.size() == 1) {
+        RelNode leftNode = relNodes.get(0);
+        return findRightExchangePrel(leftNode);
+      } else {
+        return null;
+      }
+    }
+  }
+
   private boolean containBlockNode(Prel startNode, Prel endNode) {
     BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor();
     startNode.accept(blockNodeVisitor, endNode);
@@ -311,6 +362,11 @@ public Void visitPrel(Prel prel, Prel endValue) throws RuntimeException {
         return null;
       }
 
+      if (currentPrel instanceof HashJoinPrel) {
+        encounteredBlockNode = true;
+        return null;
+      }
+
       for (Prel subPrel : currentPrel) {
         visitPrel(subPrel, endValue);
       }
@@ -349,4 +405,4 @@ public void setFromBuildSide(boolean fromBuildSide) {
     }
 
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 37934c8e9cf..c97220cef98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -129,6 +129,8 @@
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
+      new OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
+      new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
       // ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
       new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
       new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 0d97e0ac308..7915843ebb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -379,11 +379,16 @@ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) {
       return runningFragments.get(handle);
     }
 
+    /**
+     * receive the RuntimeFilter thorough the wire
+     * @param runtimeFilter
+     */
     public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
       BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
       boolean toForeman = runtimeFilterDef.getToForeman();
       QueryId queryId = runtimeFilterDef.getQueryId();
       String queryIdStr = QueryIdHelper.getQueryId(queryId);
+      runtimeFilter.retainBuffers(1);
       //to foreman
       if (toForeman) {
         Foreman foreman = queries.get(queryId);
@@ -393,13 +398,14 @@ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
             public void run() {
               final Thread currentThread = Thread.currentThread();
               final String originalName = currentThread.getName();
-              currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
+              currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
               try {
-                foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
+                foreman.getRuntimeFilterRouter().register(runtimeFilter);
               } catch (Exception e) {
                 logger.warn("Exception while registering the RuntimeFilter", e);
               } finally {
                 currentThread.setName(originalName);
+                runtimeFilter.close();
               }
             }
           });
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
index dc6cc2fcf64..afbc56a5bd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
@@ -34,6 +34,7 @@
 public class BloomFilter {
   // Bytes in a bucket.
   private static final int BYTES_PER_BUCKET = 32;
+
   // Minimum bloom filter data size.
   private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256;
 
@@ -41,16 +42,14 @@
 
   private int numBytes;
 
-  private int mask[] = new int[8];
-
-  private byte[] tempBucket = new byte[32];
-
+  private int bucketMask[] = new int[8];
 
   public BloomFilter(int numBytes, BufferAllocator bufferAllocator) {
     int size = BloomFilter.adjustByteSize(numBytes);
     this.byteBuf = bufferAllocator.buffer(size);
     this.numBytes = byteBuf.capacity();
-    this.byteBuf.writerIndex(numBytes);
+    this.byteBuf.writeZero(this.numBytes);
+    this.byteBuf.writerIndex(this.numBytes);
   }
 
   public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) {
@@ -74,26 +73,27 @@ public static int adjustByteSize(int numBytes) {
   }
 
   private void setMask(int key) {
-    //8 odd numbers act as salt value to participate in the computation of the mask.
+    //8 odd numbers act as salt value to participate in the computation of the bucketMask.
     final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
 
-    Arrays.fill(mask, 0);
+    Arrays.fill(bucketMask, 0);
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = key * SALT[i];
+      bucketMask[i] = key * SALT[i];
     }
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = mask[i] >> 27;
+      bucketMask[i] = bucketMask[i] >>> 27;
     }
 
     for (int i = 0; i < 8; ++i) {
-      mask[i] = 0x1 << mask[i];
+      bucketMask[i] = 0x1 << bucketMask[i];
     }
   }
 
   /**
    * Add an element's hash value to this bloom filter.
+   *
    * @param hash hash result of element.
    */
   public void insert(long hash) {
@@ -101,16 +101,13 @@ public void insert(long hash) {
     int key = (int) hash;
     setMask(key);
     int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
-    byteBuf.getBytes(initialStartIndex, tempBucket);
     for (int i = 0; i < 8; i++) {
+      int index = initialStartIndex + i * 4;
       //every iterate batch,we set 32 bits
-      int bitsetIndex = i * 4;
-      tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24));
-      tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16));
-      tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8));
-      tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i]));
+      int a = byteBuf.getInt(index);
+      a |= bucketMask[i];
+      byteBuf.setInt(index, a);
     }
-    byteBuf.setBytes(initialStartIndex, tempBucket);
   }
 
   /**
@@ -123,17 +120,12 @@ public boolean find(long hash) {
     int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
     int key = (int) hash;
     setMask(key);
-
     int startIndex = bucketIndex * BYTES_PER_BUCKET;
-    byteBuf.getBytes(startIndex, tempBucket);
     for (int i = 0; i < 8; i++) {
-      byte set = 0;
-      int bitsetIndex = i * 4;
-      set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
-      set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
-      set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
-      set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
-      if (0 == set) {
+      int index = startIndex + i * 4;
+      int a = byteBuf.getInt(index);
+      int b = a & bucketMask[i];
+      if (b == 0) {
         return false;
       }
     }
@@ -142,6 +134,7 @@ public boolean find(long hash) {
 
   /**
    * Merge this bloom filter with other one
+   *
    * @param other
    */
   public void or(BloomFilter other) {
@@ -150,20 +143,19 @@ public void or(BloomFilter other) {
     Preconditions.checkArgument(otherLength == thisLength);
     Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0);
     Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0);
-    byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET];
-    for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) {
-      byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket);
-      other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket);
-      for (int j = 0; j < BYTES_PER_BUCKET; j++) {
-        tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]);
-      }
-      this.byteBuf.setBytes(i, tempBucket);
+    for (int i = 0; i < thisLength / 8; i++) {
+      int index = i * 8;
+      long a = byteBuf.getLong(index);
+      long b = other.byteBuf.getLong(index);
+      long c = a | b;
+      byteBuf.setLong(index, c);
     }
   }
 
   /**
    * Calculate optimal size according to the number of distinct values and false positive probability.
    * See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula.
+   *
    * @param ndv: The number of distinct values.
    * @param fpp: The false positive probability.
    * @return optimal number of bytes of given ndv and fpp.
@@ -177,7 +169,7 @@ public static int optimalNumOfBytes(long ndv, double fpp) {
     bits |= bits >> 8;
     bits |= bits >> 16;
     bits++;
-    int bytes = bits/8;
+    int bytes = bits / 8;
     return bytes;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
index 9a6df57aeb3..b2a9bd76337 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
@@ -28,6 +28,8 @@
   private boolean local;
 
   private String probeField;
+
+  private String buildField;
   //TODO
   @JsonIgnore
   private Double leftNDV;
@@ -37,10 +39,11 @@
 
   @JsonCreator
   public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField")
-                        String probeField){
+                        String probeField, @JsonProperty("buildField") String buildField){
     this.numBytes = numBytes;
     this.local = local;
     this.probeField = probeField;
+    this.buildField = buildField;
   }
 
 
@@ -61,7 +64,7 @@ public String getProbeField() {
   }
 
   public String toString() {
-    return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }";
+    return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }";
   }
 
   @JsonIgnore
@@ -82,4 +85,9 @@ public void setRightNDV(Double rightNDV) {
     this.rightNDV = rightNDV;
   }
 
+  public String getBuildField()
+  {
+    return buildField;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
index 5fb51bf2478..efe300f3a45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
@@ -18,13 +18,8 @@
 package org.apache.drill.exec.work.filter;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.physical.base.GroupScan;
-
-
 import java.util.List;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
@@ -37,17 +32,18 @@
   private List<BloomFilterDef> bloomFilterDefs;
 
   private boolean sendToForeman;
-  @JsonIgnore
-  private GroupScan probeSideGroupScan;
 
+  private long runtimeFilterIdentifier;
 
   @JsonCreator
   public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter,
-                          @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) {
+                          @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman,
+                          @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) {
     this.generateBloomFilter = generateBloomFilter;
     this.generateMinMaxFilter = generateMinMaxFilter;
     this.bloomFilterDefs = bloomFilterDefs;
     this.sendToForeman = sendToForeman;
+    this.runtimeFilterIdentifier = runtimeFilterIdentifier;
   }
 
 
@@ -84,12 +80,11 @@ public void setSendToForeman(boolean sendToForeman) {
     this.sendToForeman = sendToForeman;
   }
 
-  @JsonIgnore
-  public GroupScan getProbeSideGroupScan() {
-    return probeSideGroupScan;
+  public long getRuntimeFilterIdentifier() {
+    return runtimeFilterIdentifier;
   }
 
-  public void setProbeSideGroupScan(GroupScan probeSideGroupScan) {
-    this.probeSideGroupScan = probeSideGroupScan;
+  public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) {
+    this.runtimeFilterIdentifier = runtimeFilterIdentifier;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index 6e4a9a8e511..93736c5f214 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,9 @@ public RuntimeFilterReporter(ExecutorFragmentContext context) {
     this.context = context;
   }
 
-  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
+  public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) {
+    boolean sendToForeman = runtimeFilterDef.isSendToForeman();
+    long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
     ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
     DrillBuf[] data = new DrillBuf[bloomFilters.size()];
     List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -64,6 +66,7 @@ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, bo
       .setMinorFragmentId(minorFragmentId)
       .setToForeman(sendToForeman)
       .setHjOpId(hashJoinOpId)
+      .setRfIdentifier(rfIdentifier)
       .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
       .build();
     RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index 5a8c6fc9e1f..a4946a96ce6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -17,39 +17,24 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.DrillBuf;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.Consumer;
 import org.apache.drill.exec.ops.SendingAccountor;
-import org.apache.drill.exec.ops.StatusHandler;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.Wrapper;
-import org.apache.drill.exec.proto.BitData;
 import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class manages the RuntimeFilter routing information of the pushed down join predicate
@@ -69,29 +54,24 @@
 public class RuntimeFilterRouter {
 
   private Wrapper rootWrapper;
-  //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
-  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probdeScanEps = new HashMap<>();
-  //HashJoin node's major fragment id to its corresponding probe side nodes's number
-  private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
-  //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
-  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-
-  private DrillbitContext drillbitContext;
 
   private SendingAccountor sendingAccountor = new SendingAccountor();
 
+  private RuntimeFilterSink runtimeFilterSink;
+
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
 
   /**
    * This class maintains context for the runtime join push down's filter management. It
    * does a traversal of the physical operators by leveraging the root wrapper which indirectly
    * holds the global PhysicalOperator tree and contains the minor fragment endpoints.
+   *
    * @param workUnit
    * @param drillbitContext
    */
   public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
     this.rootWrapper = workUnit.getRootWrapper();
-    this.drillbitContext = drillbitContext;
+    runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor);
   }
 
   /**
@@ -99,6 +79,12 @@ public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitConte
    * record the relationship between the RuntimeFilter producers and consumers.
    */
   public void collectRuntimeFilterParallelAndControlInfo() {
+    //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+    Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
+    //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+    Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
+    Map<Integer, Integer> joinMjId2rfNumber = new HashMap<>();
+
     RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector();
     rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null);
     List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders();
@@ -107,67 +93,33 @@ public void collectRuntimeFilterParallelAndControlInfo() {
       List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints();
       int probeSideScanMajorId = holder.getProbeSideScanMajorId();
       int joinNodeMajorId = holder.getJoinMajorId();
+      int buildSideRfNumber = holder.getBuildSideRfNumber();
       RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef();
       boolean sendToForeman = runtimeFilterDef.isSendToForeman();
       if (sendToForeman) {
         //send RuntimeFilter to Foreman
-        joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints);
-        joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size());
+        joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints);
         joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId);
+        joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber);
       }
     }
+    runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps);
+    runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber);
+    runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId);
   }
 
-
   public void waitForComplete() {
     sendingAccountor.waitForSendComplete();
+    runtimeFilterSink.close();
   }
 
   /**
    * This method is passively invoked by receiving a runtime filter from the network
-   * @param runtimeFilterWritable
+   *
+   * @param srcRuntimeFilterWritable
    */
-  public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
-    broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
-  }
-
-
-  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
-    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
-    int joinMajorId = runtimeFilterB.getMajorFragmentId();
-    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
-    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
-    DrillBuf[] data = srcRuntimeFilterWritable.getData();
-    List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
-    int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
-    for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
-      BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
-      for (String probeField : probeFields) {
-        builder.addProbeFields(probeField);
-      }
-      BitData.RuntimeFilterBDef runtimeFilterBDef = builder
-        .setQueryId(queryId)
-        .setMajorFragmentId(scanNodeMjId)
-        .setMinorFragmentId(minorId)
-        .build();
-      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
-      CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-      DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
-      Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
-        @Override
-        public void accept(final RpcException e) {
-          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
-        }
-
-        @Override
-        public void interrupt(final InterruptedException e) {
-          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
-        }
-      };
-      RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
-      AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
-      accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
-    }
+  public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+    runtimeFilterSink.add(srcRuntimeFilterWritable);
   }
 
   /**
@@ -183,18 +135,29 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder holder) throws RuntimeEx
       boolean isHashJoinOp = op instanceof HashJoinPOP;
       if (isHashJoinOp) {
         HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+        int hashJoinOpId = hashJoinPOP.getOperatorId();
         RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef();
-        if (runtimeFilterDef != null) {
-          if (holder == null) {
-            holder = new RFHelperHolder();
+        if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) {
+          if (holder == null || holder.getJoinOpId() != hashJoinOpId) {
+            holder = new RFHelperHolder(hashJoinOpId);
             holders.add(holder);
           }
           holder.setRuntimeFilterDef(runtimeFilterDef);
-          GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan();
-          Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP);
+          long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
+          WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP);
+          Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor);
+          if (container == null) {
+            throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId()));
+          }
+          int buildSideRFNumber = container.getAssignedEndpoints().size();
+          holder.setBuildSideRfNumber(buildSideRFNumber);
           int majorFragmentId = container.getMajorFragmentId();
           holder.setJoinMajorId(majorFragmentId);
-          Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp);
+          WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier);
+          Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor);
+          if (probeSideScanContainer == null) {
+            throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId()));
+          }
           int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId();
           List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints();
           holder.setProbeSideScanEndpoints(probeSideScanEps);
@@ -209,59 +172,63 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder holder) throws RuntimeEx
     }
   }
 
-  private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
+  private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
+    targetOpVisitor.setCurrentFragment(wrapper.getNode());
+    wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+    boolean contain = targetOpVisitor.isContain();
+    if (contain) {
+      return wrapper;
+    }
+    List<Wrapper> dependencies = wrapper.getFragmentDependencies();
+    if (CollectionUtils.isEmpty(dependencies)) {
+      return null;
+    }
+    for (Wrapper dependencyWrapper : dependencies) {
+      Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor);
+      if (opContainer != null) {
+        return opContainer;
+      }
+    }
+    return null;
+  }
 
-    private Fragment fragment;
+  private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable> extends AbstractPhysicalVisitor<T, X, E> {
 
-    private boolean contain = false;
+    protected Exchange sendingExchange;
 
-    private boolean targetIsGroupScan;
+    public void setCurrentFragment(Fragment fragment) {
+      sendingExchange = fragment.getSendingExchange();
+    }
 
-    private boolean targetIsHashJoin;
+    public abstract boolean isContain();
+  }
 
-    private String targetGroupScanDigest;
+  private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
 
-    private String targetHashJoinJson;
+    private boolean contain = false;
 
+    private PhysicalOperator targetOp;
 
-    public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
-      this.fragment = fragment;
-      this.targetIsGroupScan = targetOp instanceof GroupScan;
-      this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
-      this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null;
-      this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null;
+    public WrapperOperatorsVisitor(PhysicalOperator targetOp) {
+      this.targetOp = targetOp;
     }
 
     @Override
     public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
-      List<Fragment.ExchangeFragmentPair> exchangeFragmentPairs = fragment.getReceivingExchangePairs();
-      for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) {
-        boolean same = exchange == exchangeFragmentPair.getExchange();
-        if (same) {
-          return null;
-        }
+      if (exchange != sendingExchange) {
+        return null;
       }
       return exchange.getChild().accept(this, value);
     }
 
     @Override
     public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
-      boolean same = false;
-      if (targetIsGroupScan && op instanceof GroupScan) {
-        //Since GroupScan may be rewrite during the planing, here we use the digest to identify it.
-        String currentDigest = ((GroupScan) op).getDigest();
-        same = targetGroupScanDigest.equals(currentDigest);
-      }
-      if (targetIsHashJoin && op instanceof HashJoinPOP) {
-        String currentOpJson = jsonOfPhysicalOp(op);
-        same = targetHashJoinJson.equals(currentOpJson);
-      }
-      if (!same) {
+      if (op == targetOp) {
+        contain = true;
+      } else {
         for (PhysicalOperator child : op) {
           child.accept(this, value);
         }
-      } else {
-        contain = true;
       }
       return null;
     }
@@ -269,42 +236,57 @@ public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
     public boolean isContain() {
       return contain;
     }
+  }
+
+  private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
+
+    private boolean contain = false;
+
+    private long identifier;
+
+
+    public WrapperRuntimeFilterOperatorsVisitor(long identifier) {
+      this.identifier = identifier;
+    }
 
-    public String jsonOfPhysicalOp(PhysicalOperator operator) {
-      try {
-        ObjectMapper objectMapper = new ObjectMapper();
-        StringWriter stringWriter = new StringWriter();
-        objectMapper.writeValue(stringWriter, operator);
-        return stringWriter.toString();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+    @Override
+    public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
+      if (exchange != sendingExchange) {
+        return null;
       }
+      return exchange.getChild().accept(this, value);
     }
-  }
 
-  private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) {
-    WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode());
-    wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null);
-    return wrapperOpsVistitor.isContain();
-  }
+    @Override
+    public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
+      boolean same;
+      boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP;
+      boolean isHashJoinPop = op instanceof HashJoinPOP;
 
-  private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) {
-    boolean contain = containsPhysicalOperator(wrapper, op);
-    if (contain) {
-      return wrapper;
-    }
-    List<Wrapper> dependencies = wrapper.getFragmentDependencies();
-    if (CollectionUtils.isEmpty(dependencies)) {
+      if (isHashJoinPop) {
+        HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+        PhysicalOperator leftPop = hashJoinPOP.getLeft();
+        leftPop.accept(this, value);
+        return null;
+      }
+
+      if (isRuntimeFilterPop) {
+        RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op;
+        same = this.identifier == runtimeFilterPOP.getIdentifier();
+        if (same) {
+          contain = true;
+          return null;
+        }
+      }
+      for (PhysicalOperator child : op) {
+        child.accept(this, value);
+      }
       return null;
     }
-    for (Wrapper dependencyWrapper : dependencies) {
-      Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op);
-      if (opContainer != null) {
-        return opContainer;
-      }
+
+    public boolean isContain() {
+      return contain;
     }
-    //should not be here
-    throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId()));
   }
 
   /**
@@ -320,6 +302,22 @@ private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) {
 
     private RuntimeFilterDef runtimeFilterDef;
 
+    private int joinOpId;
+
+    private int buildSideRfNumber;
+
+    public RFHelperHolder(int joinOpId) {
+      this.joinOpId = joinOpId;
+    }
+
+    public int getJoinOpId() {
+      return joinOpId;
+    }
+
+    public void setJoinOpId(int joinOpId) {
+      this.joinOpId = joinOpId;
+    }
+
     public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
       return probeSideScanEndpoints;
     }
@@ -352,5 +350,13 @@ public RuntimeFilterDef getRuntimeFilterDef() {
     public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) {
       this.runtimeFilterDef = runtimeFilterDef;
     }
+
+    public int getBuildSideRfNumber() {
+      return buildSideRfNumber;
+    }
+
+    public void setBuildSideRfNumber(int buildSideRfNumber) {
+      this.buildSideRfNumber = buildSideRfNumber;
+    }
   }
-}
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 14686254f93..f69a44ef7d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -17,206 +17,250 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
 
-  private int staleBookId = 0;
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
 
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+  private Map<Integer, Integer> joinMjId2rfNumber;
 
-  private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
+  //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
 
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
-  private ReentrantLock queueLock = new ReentrantLock();
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<>();
+  //for debug usage
+  private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
 
-  private Condition notEmpty = queueLock.newCondition();
+  private DrillbitContext drillbitContext;
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private SendingAccountor sendingAccountor;
 
-  private BufferAllocator bufferAllocator;
+  private  AsyncAggregateWorker asyncAggregateWorker;
 
-  private Future future;
+  private AtomicBoolean running = new AtomicBoolean(true);
 
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
-    this.bufferAllocator = bufferAllocator;
-    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    future = executorService.submit(asyncAggregateWorker);
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor)
+  {
+    this.drillbitContext = drillbitContext;
+    this.sendingAccountor = sendingAccountor;
+    asyncAggregateWorker = new AsyncAggregateWorker();
+    drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    if (running.get()) {
-      try {
-        aggregatedRFLock.lock();
-        if (containOne()) {
-          boolean same = aggregated.equals(runtimeFilterWritable);
-          if (!same) {
-            // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
-            // share the same FragmentContext.
-            aggregated.close();
-            currentBookId.set(0);
-            staleBookId = 0;
-            clearQueued(false);
-          }
-        }
-      } finally {
-        aggregatedRFLock.unlock();
-      }
+  public void add(RuntimeFilterWritable runtimeFilterWritable)
+  {
+    if (!running.get()) {
+      runtimeFilterWritable.close();
+      return;
+    }
+    runtimeFilterWritable.retainBuffers(1);
+    int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+    if (joinMjId2Stopwatch.get(joinMjId) == null) {
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      joinMjId2Stopwatch.put(joinMjId, stopwatch);
+    }
+    synchronized (rfQueue) {
+      rfQueue.add(runtimeFilterWritable);
+      rfQueue.notify();
+    }
+  }
 
+  public void close() {
+    running.set(false);
+    if (asyncAggregateWorker != null) {
+      synchronized (rfQueue) {
+        rfQueue.notify();
+      }
+    }
+    while (!asyncAggregateWorker.over.get()) {
       try {
-        queueLock.lock();
-        if (rfQueue != null) {
-          rfQueue.add(runtimeFilterWritable);
-          notEmpty.signal();
-        } else {
-          runtimeFilterWritable.close();
-        }
-      } finally {
-        queueLock.unlock();
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e);
       }
-    } else {
+    }
+    for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) {
       runtimeFilterWritable.close();
     }
   }
 
-  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
-    try {
-      aggregatedRFLock.lock();
-      return aggregated.duplicate(bufferAllocator);
-    } finally {
-      aggregatedRFLock.unlock();
+  private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable)
+  {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    int buildSideRfNumber;
+    RuntimeFilterWritable toAggregated = null;
+    buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+    buildSideRfNumber--;
+    joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+    toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+    if (toAggregated == null) {
+      toAggregated = srcRuntimeFilterWritable;
+      toAggregated.retainBuffers(1);
+    } else {
+      toAggregated.aggregate(srcRuntimeFilterWritable);
     }
-  }
-
-  /**
-   * whether there's a fresh aggregated RuntimeFilter
-   *
-   * @return
-   */
-  public boolean hasFreshOne() {
-    if (currentBookId.get() > staleBookId) {
-      staleBookId = currentBookId.get();
-      return true;
+    joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+    if (buildSideRfNumber == 0) {
+      joinMjId2AggregatedRF.remove(joinMajorId);
+      route(toAggregated);
+      joinMjId2rfNumber.remove(joinMajorId);
+      Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId);
+      logger.info(
+          "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms",
+          joinMajorId,
+          stopwatch.elapsed(TimeUnit.MILLISECONDS)
+      );
     }
-    return false;
-  }
-
-  /**
-   * whether there's a usable RuntimeFilter.
-   *
-   * @return
-   */
-  public boolean containOne() {
-    return aggregated != null;
   }
 
-  private void doCleanup() {
-    running.compareAndSet(true, false);
-    try {
-      aggregatedRFLock.lock();
-      if (containOne()) {
-        aggregated.close();
-        aggregated = null;
+  private void route(RuntimeFilterWritable srcRuntimeFilterWritable)
+  {
+    BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+    List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
+    long rfIdentifier = runtimeFilterB.getRfIdentifier();
+    DrillBuf[] data = srcRuntimeFilterWritable.getData();
+    List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probeScanEps.get(joinMajorId);
+    int scanNodeSize = scanNodeEps.size();
+    srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
+    int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
+    for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
+      BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
+      for (String probeField : probeFields) {
+        builder.addProbeFields(probeField);
       }
-    } finally {
-      aggregatedRFLock.unlock();
+      BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId)
+                                                           .setMajorFragmentId(scanNodeMjId)
+                                                           .setMinorFragmentId(minorId)
+                                                           .setToForeman(false)
+                                                           .setRfIdentifier(rfIdentifier)
+                                                           .addAllBloomFilterSizeInBytes(sizeInBytes)
+                                                           .build();
+      RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
+      CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
+
+      DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
+      Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>()
+      {
+        @Override
+        public void accept(final RpcException e)
+        {
+          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+        }
+
+        @Override
+        public void interrupt(final InterruptedException e)
+        {
+          logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+        }
+      };
+      RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+      AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
+      accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
     }
   }
 
-  @Override
-  public void close() throws Exception {
-    future.cancel(true);
-    doCleanup();
+  public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber)
+  {
+    this.joinMjId2rfNumber = joinMjId2rfNumber;
   }
 
-  private void clearQueued(boolean setToNull) {
-    RuntimeFilterWritable toClear;
-    try {
-      queueLock.lock();
-      while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
-        toClear.close();
-      }
-      rfQueue = (setToNull) ? null : rfQueue;
-    } finally {
-      queueLock.unlock();
-    }
+  public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps)
+  {
+    this.joinMjId2probeScanEps = joinMjId2probeScanEps;
   }
 
-  private class AsyncAggregateWorker implements Runnable {
+  public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId)
+  {
+    this.joinMjId2ScanMjId = joinMjId2ScanMjId;
+  }
+
+  private class AsyncAggregateWorker implements Runnable
+  {
+    private AtomicBoolean over = new AtomicBoolean(false);
 
     @Override
-    public void run() {
-      try {
+    public void run()
+    {
+      while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) {
         RuntimeFilterWritable toAggregate = null;
-        while (running.get()) {
+        synchronized (rfQueue) {
           try {
-            queueLock.lock();
-            toAggregate = (rfQueue != null) ? rfQueue.poll() :  null;
-            if (toAggregate == null) {
-              notEmpty.await();
-              continue;
+            toAggregate = rfQueue.poll();
+            while (toAggregate == null && running.get()) {
+              rfQueue.wait();
+              toAggregate = rfQueue.poll();
             }
-          } finally {
-            queueLock.unlock();
+          } catch (InterruptedException ex) {
+            logger.error("RFW_Aggregator thread being interrupted", ex);
+            continue;
           }
-
-          try {
-            aggregatedRFLock.lock();
-            if (containOne()) {
-              aggregated.aggregate(toAggregate);
-
-              // Release the byteBuf referenced by toAggregate since aggregate will not do it
-              toAggregate.close();
-            } else {
-              aggregated = toAggregate;
-            }
-          } finally {
-            aggregatedRFLock.unlock();
+        }
+        if (toAggregate == null) {
+          continue;
+        }
+        // perform aggregate outside the sync block.
+        try {
+          aggregate(toAggregate);
+        } catch (Exception ex) {
+          logger.error("Failed to aggregate or route the RFW", ex);
+          throw new DrillRuntimeException(ex);
+        } finally {
+          if (toAggregate != null) {
+            toAggregate.close();
           }
-          currentBookId.incrementAndGet();
         }
-      } catch (InterruptedException e) {
-        logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
-        Thread.currentThread().interrupt();
-      } finally {
-        doCleanup();
-        clearQueued(true);
       }
+
+      if (!running.get()) {
+        RuntimeFilterWritable toClose;
+        while ((toClose = rfQueue.poll()) != null) {
+          toClose.close();
+        }
+      }
+      over.set(true);
     }
   }
 }
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 9a971e94cbc..f8c2701b145 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -103,6 +103,27 @@ public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
     return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
+  public void retainBuffers(final int increment) {
+    if (increment <= 0) {
+      return;
+    }
+    for (final DrillBuf buf : data) {
+      buf.retain(increment);
+    }
+  }
+  //TODO: Not used currently because of DRILL-6826
+  public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) {
+    int bufNum = data.length;
+    DrillBuf [] newBufs = new DrillBuf[bufNum];
+    int i = 0;
+    for (DrillBuf buf : data) {
+      DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer;
+      newBufs[i] = transferredBuffer;
+      i++;
+    }
+    return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs);
+  }
+
   public String toString() {
     return identifier;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 42b76f278c7..a379db175a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -151,7 +151,7 @@ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
     this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
     this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
     this.profileOption = setProfileOption(queryContext.getOptions());
-    this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY);
+    this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
   }
 
 
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8aa3233a904..a2d3cdc7a50 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -469,6 +469,8 @@ drill.exec.options: {
     exec.hashjoin.enable.runtime_filter: false,
     exec.hashjoin.bloom_filter.fpp: 0.75,
     exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
+    exec.hashjoin.runtime_filter.waiting.enable: true,
+    exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
     exec.hashagg.mem_limit: 0,
     exec.hashagg.min_batches_per_partition: 2,
     exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
index 5eae12e6eba..a5fc5ba49af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPD.java
@@ -40,9 +40,9 @@
   public void testBroadcastHashJoin1Cond() {
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
-    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
+    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
     bloomFilterDefs.add(bloomFilterDef);
-    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
     HashJoinPOP joinConf = new HashJoinPOP(null, null,
       Lists.newArrayList(joinCond("lft", "EQUALS", "rgt")), JoinRelType.INNER, runtimeFilterDef);
     operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
@@ -71,11 +71,11 @@ public void testBroadcastHashJoin1Cond() {
   public void testBroadcastHashJoin2Cond() {
     List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
     int numBytes = BloomFilter.optimalNumOfBytes(2600, 0.01);
-    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft");
-    BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a");
+    BloomFilterDef bloomFilterDef = new BloomFilterDef(numBytes, true, "lft", "rgt");
+    BloomFilterDef bloomFilterDef1 = new BloomFilterDef(numBytes, true, "a", "b");
     bloomFilterDefs.add(bloomFilterDef);
     bloomFilterDefs.add(bloomFilterDef1);
-    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false );
+    RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
     HashJoinPOP joinConf = new HashJoinPOP(null, null,
       Lists.newArrayList(joinCond("lft", "EQUALS", "rgt"), joinCond("a", "EQUALS", "b")), JoinRelType.INNER, runtimeFilterDef);
     operatorFixture.getOptionManager().setLocalOption("exec.hashjoin.num_partitions", 4);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
index 2370ffa06aa..ac174d1cea5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDPlan.java
@@ -33,38 +33,6 @@ public static void setupTestFiles() throws Exception {
     test("alter session set `exec.hashjoin.enable.runtime_filter` = true");
   }
 
-  @Test
-  public void testInnnerHashJoin() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " dfs.`sample-data/nation.parquet` nations\n"
-      + "JOIN\n"
-      + "  dfs.`sample-data/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
-  @Test
-  public void testRightHashJoin() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " dfs.`sample-data/nation.parquet` nations\n"
-      + "RIGHT JOIN\n"
-      + "  dfs.`sample-data/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
   @Test
   public void testLeftHashJoin() throws Exception {
     String sql = "SELECT nations.N_NAME, count(*)"
@@ -95,24 +63,4 @@ public void testHashJoinWithFuncJoinCondition() throws Exception {
     String excludedColNames3 =  "\"runtime-filter\"";
     testPlanWithAttributesMatchingPatterns(sql, null, new String[]{excludedColNames1, excludedColNames2, excludedColNames3});
   }
-
-  @Test
-  public void testInnnerHashJoinWithRightDeepTree() throws Exception {
-    String sql = "SELECT nations.N_NAME, count(*)"
-      + "FROM\n"
-      + " cp.`tpch/nation.parquet` nations\n"
-      + "JOIN\n"
-      + "  cp.`tpch/region.parquet` regions\n"
-      + "  on nations.N_REGIONKEY = regions.R_REGIONKEY "
-      + "JOIN cp.`tpch/customer.parquet` customers\n"
-      + " on nations.N_NATIONKEY = customers.C_NATIONKEY "
-      + "WHERE nations.N_NAME = 'A' "
-      + "group by nations.N_NAME";
-    String expectedColNames1 =  "\"runtimeFilterDef\"";
-    String expectedColNames2 =  "\"bloomFilterDefs\"";
-    String expectedColNames3 =  "\"runtime-filter\"";
-    testPhysicalPlan(sql, expectedColNames1, expectedColNames2, expectedColNames3);
-  }
-
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
index c05cdfdf8ff..c1d1576007e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java
@@ -135,7 +135,6 @@ public boolean hasFailed() {
 
   @Test
   public void testNotExist() throws Exception {
-
     Drillbit bit = new Drillbit(c, RemoteServiceSet.getLocalServiceSet(), ClassPathScanner.fromPrescan(c));
     bit.run();
     DrillbitContext bitContext = bit.getContext();
@@ -192,6 +191,12 @@ public void testNotExist() throws Exception {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertFalse(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 
 
@@ -254,6 +259,12 @@ public void testExist() throws Exception {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertTrue(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 
 
@@ -324,5 +335,11 @@ public void testMerged() throws Exception {
     long hashCode = probeHash64.hash64Code(0, 0, 0);
     boolean contain = bloomFilter.find(hashCode);
     Assert.assertTrue(contain);
+    bloomFilter.getContent().close();
+    vectorContainer.clear();
+    probeVectorContainer.clear();
+    context.close();
+    bitContext.close();
+    bit.close();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index a1e7d0d2a41..b41798dedc7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.test;
 
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -26,10 +25,6 @@
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Function;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import io.netty.buffer.DrillBuf;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -76,12 +71,7 @@
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.hadoop.security.UserGroupInformation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -182,7 +172,6 @@ public OperatorFixture build() {
 
     private ExecutorState executorState = new OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
-    private RuntimeFilterSink runtimeFilterSink;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -198,7 +187,6 @@ public MockFragmentContext(final DrillConfig config,
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
-      this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
@@ -319,13 +307,18 @@ public void close() {
     }
 
     @Override
-    public RuntimeFilterSink getRuntimeFilterSink() {
-      return runtimeFilterSink;
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
     }
 
     @Override
-    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      runtimeFilterSink.aggregate(runtimeFilter);
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+      return null;
+    }
+
+    @Override
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+    {
+      return null;
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 84a7c785bdc..b0820e92626 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,7 +38,6 @@
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -74,6 +73,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class PhysicalOpUnitTestBase extends ExecTest {
   protected MockExecutorFragmentContext fragContext;
@@ -198,12 +198,10 @@ protected OperatorTestBuilder opTestBuilder() {
    * </p>
    */
   protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
-    private RuntimeFilterSink runtimeFilterSink;
 
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
-      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
     }
 
     @Override
@@ -301,12 +299,17 @@ public boolean isUserAuthenticationEnabled() {
 
     @Override
     public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      this.runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
-    public RuntimeFilterSink getRuntimeFilterSink() {
-      return runtimeFilterSink;
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier) {
+      return null;
+    }
+
+    @Override
+    public RuntimeFilterWritable getRuntimeFilter(long rfIdentifier, long maxWaitTime, TimeUnit timeUnit)
+    {
+      return null;
     }
   }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index d7921fc0f9f..e43380db833 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2536,6 +2536,24 @@ public Builder clearIsLastBatch() {
      * </pre>
      */
     int getHjOpId();
+
+    // optional int64 rf_identifier = 8;
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    boolean hasRfIdentifier();
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    long getRfIdentifier();
   }
   /**
    * Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2650,6 +2668,11 @@ private RuntimeFilterBDef(
               hjOpId_ = input.readInt32();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000020;
+              rfIdentifier_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2867,6 +2890,30 @@ public int getHjOpId() {
       return hjOpId_;
     }
 
+    // optional int64 rf_identifier = 8;
+    public static final int RF_IDENTIFIER_FIELD_NUMBER = 8;
+    private long rfIdentifier_;
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    public boolean hasRfIdentifier() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int64 rf_identifier = 8;</code>
+     *
+     * <pre>
+     * the runtime filter identifier
+     * </pre>
+     */
+    public long getRfIdentifier() {
+      return rfIdentifier_;
+    }
+
     private void initFields() {
       queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
@@ -2875,6 +2922,7 @@ private void initFields() {
       bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
       probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
       hjOpId_ = 0;
+      rfIdentifier_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2909,6 +2957,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeInt32(7, hjOpId_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(8, rfIdentifier_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2956,6 +3007,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(7, hjOpId_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(8, rfIdentifier_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3091,6 +3146,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000020);
         hjOpId_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        rfIdentifier_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -3154,6 +3211,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000010;
         }
         result.hjOpId_ = hjOpId_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.rfIdentifier_ = rfIdentifier_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3205,6 +3266,9 @@ public Builder mergeFrom(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef o
         if (other.hasHjOpId()) {
           setHjOpId(other.getHjOpId());
         }
+        if (other.hasRfIdentifier()) {
+          setRfIdentifier(other.getRfIdentifier());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3708,6 +3772,55 @@ public Builder clearHjOpId() {
         return this;
       }
 
+      // optional int64 rf_identifier = 8;
+      private long rfIdentifier_ ;
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public boolean hasRfIdentifier() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public long getRfIdentifier() {
+        return rfIdentifier_;
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public Builder setRfIdentifier(long value) {
+        bitField0_ |= 0x00000080;
+        rfIdentifier_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 rf_identifier = 8;</code>
+       *
+       * <pre>
+       * the runtime filter identifier
+       * </pre>
+       */
+      public Builder clearRfIdentifier() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        rfIdentifier_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
     }
 
@@ -3761,16 +3874,17 @@ public Builder clearHjOpId() {
       " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
       "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
       "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
-      "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
+      "isLastBatch\030\007 \001(\010\"\350\001\n\021RuntimeFilterBDef\022" +
       "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
       "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
       "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
       "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
-      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
-      "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
-      "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
-      "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
-      "l.exec.protoB\007BitDataH\001"
+      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" +
+      "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007",
+      "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" +
+      "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" +
+      "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" +
+      "DataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3800,7 +3914,7 @@ public Builder clearHjOpId() {
           internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", });
           return null;
         }
       };
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 3c88ffcedbd..ecf0f187f51 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -443,6 +443,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex
                     output.writeString(6, probeFields, true);
                 if(message.hasHjOpId())
                     output.writeInt32(7, message.getHjOpId(), false);
+                if(message.hasRfIdentifier())
+                    output.writeInt64(8, message.getRfIdentifier(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
             {
@@ -504,6 +506,9 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex
                         case 7:
                             builder.setHjOpId(input.readInt32());
                             break;
+                        case 8:
+                            builder.setRfIdentifier(input.readInt64());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -551,6 +556,7 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex
                 case 5: return "bloomFilterSizeInBytes";
                 case 6: return "probeFields";
                 case 7: return "hjOpId";
+                case 8: return "rfIdentifier";
                 default: return null;
             }
         }
@@ -569,6 +575,7 @@ public static int getFieldNumber(java.lang.String name)
             fieldMap.put("bloomFilterSizeInBytes", 5);
             fieldMap.put("probeFields", 6);
             fieldMap.put("hjOpId", 7);
+            fieldMap.put("rfIdentifier", 8);
         }
     }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 2d1c2a70253..3b2c1027e6a 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -56,6 +56,7 @@ public static RuntimeFilterBDef getDefaultInstance()
     private List<Integer> bloomFilterSizeInBytes;
     private List<String> probeFields;
     private int hjOpId;
+    private long rfIdentifier;
 
     public RuntimeFilterBDef()
     {
@@ -155,6 +156,19 @@ public RuntimeFilterBDef setHjOpId(int hjOpId)
         return this;
     }
 
+    // rfIdentifier
+
+    public long getRfIdentifier()
+    {
+        return rfIdentifier;
+    }
+
+    public RuntimeFilterBDef setRfIdentifier(long rfIdentifier)
+    {
+        this.rfIdentifier = rfIdentifier;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -235,6 +249,9 @@ public void mergeFrom(Input input, RuntimeFilterBDef message) throws IOException
                 case 7:
                     message.hjOpId = input.readInt32();
                     break;
+                case 8:
+                    message.rfIdentifier = input.readInt64();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -277,6 +294,9 @@ public void writeTo(Output output, RuntimeFilterBDef message) throws IOException
 
         if(message.hjOpId != 0)
             output.writeInt32(7, message.hjOpId, false);
+
+        if(message.rfIdentifier != 0)
+            output.writeInt64(8, message.rfIdentifier, false);
     }
 
     public String getFieldName(int number)
@@ -290,6 +310,7 @@ public String getFieldName(int number)
             case 5: return "bloomFilterSizeInBytes";
             case 6: return "probeFields";
             case 7: return "hjOpId";
+            case 8: return "rfIdentifier";
             default: return null;
         }
     }
@@ -310,6 +331,7 @@ public int getFieldNumber(String name)
         __fieldMap.put("bloomFilterSizeInBytes", 5);
         __fieldMap.put("probeFields", 6);
         __fieldMap.put("hjOpId", 7);
+        __fieldMap.put("rfIdentifier", 8);
     }
     
 }
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 15c72308ee2..ae9c4c70961 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -48,4 +48,5 @@ message RuntimeFilterBDef{
   repeated int32 bloom_filter_size_in_bytes = 5;
   repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
   optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
+  optional int64 rf_identifier = 8; // the runtime filter identifier
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services