You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:42 UTC
[drill] 03/05: DRILL-6731: Move the BFs aggregating work from the
Foreman to the RuntimeFilter
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 216b1237739935b04c4f54b3f6f05371a4644085
Author: weijie.tong <we...@alipay.com>
AuthorDate: Thu Sep 6 19:23:35 2018 +0800
DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
---
.../org/apache/drill/exec/ops/FragmentContext.java | 8 +-
.../apache/drill/exec/ops/FragmentContextImpl.java | 16 +-
.../impl/filter/RuntimeFilterRecordBatch.java | 43 ++++--
.../exec/physical/impl/join/HashJoinBatch.java | 5 +-
.../physical/visitor/RuntimeFilterVisitor.java | 24 ++-
.../org/apache/drill/exec/work/WorkManager.java | 4 +-
.../exec/work/filter/RuntimeFilterReporter.java | 5 +-
...FilterManager.java => RuntimeFilterRouter.java} | 60 ++------
.../drill/exec/work/filter/RuntimeFilterSink.java | 171 +++++++++++++++++++++
.../exec/work/filter/RuntimeFilterWritable.java | 34 +++-
.../apache/drill/exec/work/foreman/Foreman.java | 16 +-
.../org/apache/drill/test/OperatorFixture.java | 11 +-
.../apache/drill/test/PhysicalOpUnitTestBase.java | 12 +-
.../java/org/apache/drill/exec/proto/BitData.java | 127 ++++++++++++++-
.../org/apache/drill/exec/proto/SchemaBitData.java | 7 +
.../drill/exec/proto/beans/RuntimeFilterBDef.java | 22 +++
protocol/src/main/protobuf/BitData.proto | 1 +
17 files changed, 448 insertions(+), 118 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c..88c21d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
void close();
/**
- * Return null ,if setRuntimeFilter not being called
* @return
*/
- RuntimeFilterWritable getRuntimeFilter();
+ RuntimeFilterSink getRuntimeFilterSink();
/**
- * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
+ * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
* @param runtimeFilter
*/
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
interface ExecutorState {
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a898078..1f9d489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
/** Stores constants and their holders by type */
private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
- private RuntimeFilterWritable runtimeFilterWritable;
+ private RuntimeFilterSink runtimeFilterSink;
/**
* Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
constantValueHolderCache = Maps.newHashMap();
+ this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
}
/**
@@ -348,13 +350,13 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterWritable = runtimeFilter;
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ this.runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return runtimeFilterWritable;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
/**
@@ -470,8 +472,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
- if (runtimeFilterWritable != null) {
- suppressingClose(runtimeFilterWritable);
+ if (runtimeFilterSink != null) {
+ suppressingClose(runtimeFilterSink);
}
suppressingClose(bufferManager);
suppressingClose(allocator);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580..9248bbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -36,7 +36,9 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
@@ -56,6 +58,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
private Map<String, Integer> field2id = new HashMap<>();
private List<String> toFilterFields;
private List<BloomFilter> bloomFilters;
+ private RuntimeFilterWritable current;
+ private RuntimeFilterWritable previous;
private int originalRecordCount;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
@@ -102,6 +106,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.clear();
}
super.close();
+ if (current != null) {
+ current.close();
+ }
}
@Override
@@ -148,30 +155,36 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
* schema change hash64 should be reset and this method needs to be called again.
*/
private void setupHashHelper() {
- final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
-
+ final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
// Check if RuntimeFilterWritable was received by the minor fragment or not
- if (runtimeFilterWritable == null) {
+ if (!runtimeFilterSink.containOne()) {
return;
}
-
- // Check if bloomFilters is initialized or not
- if (bloomFilters == null) {
- bloomFilters = runtimeFilterWritable.unwrap();
+ if (runtimeFilterSink.hasFreshOne()) {
+ RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
+ if (current == null) {
+ current = freshRuntimeFilterWritable;
+ previous = freshRuntimeFilterWritable;
+ } else {
+ previous = current;
+ current = freshRuntimeFilterWritable;
+ previous.close();
+ }
+ bloomFilters = current.unwrap();
}
-
// Check if HashHelper is initialized or not
if (hash64 == null) {
ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context);
try {
//generate hash helper
- this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+ this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList();
List<LogicalExpression> hashFieldExps = new ArrayList<>();
List<TypedFieldId> typedFieldIds = new ArrayList<>();
for (String toFilterField : toFilterFields) {
SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
- this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+ int[] fieldIds = typedFieldId.getFieldIds();
+ this.field2id.put(toFilterField, fieldIds[0]);
typedFieldIds.add(typedFieldId);
ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
hashFieldExps.add(toHashFieldExp);
@@ -195,11 +208,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.setRecordCount(0);
return;
}
-
- final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+ final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
sv2.allocateNew(originalRecordCount);
-
- if (runtimeFilterWritable == null) {
+ if (!runtimeFilterSink.containOne()) {
// means none of the rows are filtered out hence set all the indexes
for (int i = 0; i < originalRecordCount; ++i) {
sv2.setIndex(i, i);
@@ -207,10 +218,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.setRecordCount(originalRecordCount);
return;
}
-
- // Setup a hash helper if need be
+ // Setup a hash helper if needed
setupHashHelper();
-
//To make each independent bloom filter work together to construct a final filter result: BitSet.
BitSet bitSet = new BitSet(originalRecordCount);
for (int i = 0; i < toFilterFields.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 658f03a..3d45696 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -724,7 +724,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
//RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
- //RuntimeFilterManager's judgement will have the RuntimeFilterDef.
+ //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
if (runtimeFilterDef != null) {
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -944,7 +944,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
if (cycleNum == 0 && enableRuntimeFilter) {
if (bloomFilters.size() > 0) {
- runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman());
+ int hashJoinOpId = this.popConfig.getOperatorId();
+ runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index c31e491..bfba5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -197,11 +197,8 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException {
if (holder != null) {
- boolean broadcastExchange = exchange instanceof BroadcastExchangePrel;
if (holder.isFromBuildSide()) {
- //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark
- //this HashJoin as BroadcastHashJoin
- holder.setEncounteredBroadcastExchange(broadcastExchange);
+ holder.setBuildSideExchange(exchange);
}
}
return visitPrel(exchange, holder);
@@ -224,15 +221,15 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
Prel right = (Prel) hashJoinPrel.getRight();
holder.setFromBuildSide(true);
right.accept(this, holder);
- boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange();
- if (buildSideEncountererdBroadcastExchange) {
+ boolean routeToForeman = holder.needToRouteToForeman();
+ if (!routeToForeman) {
runtimeFilterDef.setSendToForeman(false);
} else {
runtimeFilterDef.setSendToForeman(true);
}
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
- if (buildSideEncountererdBroadcastExchange) {
+ if (!routeToForeman) {
bloomFilterDef.setLocal(true);
} else {
bloomFilterDef.setLocal(false);
@@ -338,18 +335,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
* RuntimeFilter helper util holder
*/
private static class RFHelperHolder {
- //whether this join operator is a partitioned HashJoin or broadcast HashJoin,
- //also single node HashJoin is not expected to do JPPD.
- private boolean encounteredBroadcastExchange;
private boolean fromBuildSide;
- public boolean isEncounteredBroadcastExchange() {
- return encounteredBroadcastExchange;
+ private ExchangePrel exchangePrel;
+
+ public void setBuildSideExchange(ExchangePrel exchange){
+ this.exchangePrel = exchange;
}
- public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) {
- this.encounteredBroadcastExchange = encounteredBroadcastExchange;
+ public boolean needToRouteToForeman() {
+ return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel);
}
public boolean isFromBuildSide() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bf91ed3..0d97e0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -395,7 +395,7 @@ public class WorkManager implements AutoCloseable {
final String originalName = currentThread.getName();
currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
try {
- foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter);
+ foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
} catch (Exception e) {
logger.warn("Exception while registering the RuntimeFilter", e);
} finally {
@@ -413,7 +413,7 @@ public class WorkManager implements AutoCloseable {
.setQueryId(queryId).build();
FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle);
if (fragmentExecutor != null) {
- fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter);
+ fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index e6ede7a..6e4a9a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,7 @@ public class RuntimeFilterReporter {
this.context = context;
}
- public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) {
+ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
DrillBuf[] data = new DrillBuf[bloomFilters.size()];
List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -63,6 +63,7 @@ public class RuntimeFilterReporter {
.setMajorFragmentId(majorFragmentId)
.setMinorFragmentId(minorFragmentId)
.setToForeman(sendToForeman)
+ .setHjOpId(hashJoinOpId)
.addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
.build();
RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
@@ -72,7 +73,7 @@ public class RuntimeFilterReporter {
AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint);
dataTunnel.sendRuntimeFilter(runtimeFilterWritable);
} else {
- context.setRuntimeFilter(runtimeFilterWritable);
+ context.addRuntimeFilter(runtimeFilterWritable);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
similarity index 87%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index e3f89a6..5a8c6fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.work.filter;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.Consumer;
@@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
@@ -60,17 +59,14 @@ import java.util.concurrent.ConcurrentHashMap;
* The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
* To Partitioned case:
* The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream
+ * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the
+ * scanned rows to generate the SV2.
* To Broadcast case:
* The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the
* RuntimeFilterRecordBath is the same as the Partitioned one.
- *
- *
- *
*/
-public class RuntimeFilterManager {
+public class RuntimeFilterRouter {
private Wrapper rootWrapper;
//HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
@@ -79,14 +75,12 @@ public class RuntimeFilterManager {
private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
//HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
- //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
- private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>();
private DrillbitContext drillbitContext;
private SendingAccountor sendingAccountor = new SendingAccountor();
- private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
/**
* This class maintains context for the runtime join push down's filter management. It
@@ -95,7 +89,7 @@ public class RuntimeFilterManager {
* @param workUnit
* @param drillbitContext
*/
- public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
+ public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
this.rootWrapper = workUnit.getRootWrapper();
this.drillbitContext = drillbitContext;
}
@@ -134,32 +128,16 @@ public class RuntimeFilterManager {
* @param runtimeFilterWritable
*/
public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
- BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef();
- int majorId = runtimeFilterB.getMajorFragmentId();
- UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
- List<String> probeFields = runtimeFilterB.getProbeFieldsList();
- logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
- int size;
- synchronized (this) {
- size = joinMjId2scanSize.get(majorId);
- Preconditions.checkState(size > 0);
- RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId);
- if (aggregatedRuntimeFilter == null) {
- aggregatedRuntimeFilter = runtimeFilterWritable;
- } else {
- aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
- }
- joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
- size--;
- joinMjId2scanSize.put(majorId, size);
- }
- if (size == 0) {
- broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
- }
+ broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
}
- private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) {
+ private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+ List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+ DrillBuf[] data = srcRuntimeFilterWritable.getData();
List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
@@ -172,10 +150,8 @@ public class RuntimeFilterManager {
.setMajorFragmentId(scanNodeMjId)
.setMinorFragmentId(minorId)
.build();
- RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId);
- RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData());
+ RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-
DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@Override
@@ -235,8 +211,6 @@ public class RuntimeFilterManager {
private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
- private PhysicalOperator targetOp;
-
private Fragment fragment;
private boolean contain = false;
@@ -251,7 +225,6 @@ public class RuntimeFilterManager {
public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
- this.targetOp = targetOp;
this.fragment = fragment;
this.targetIsGroupScan = targetOp instanceof GroupScan;
this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
@@ -343,13 +316,10 @@ public class RuntimeFilterManager {
private int probeSideScanMajorId;
-
-
private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
private RuntimeFilterDef runtimeFilterDef;
-
public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
return probeSideScanEndpoints;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
new file mode 100644
index 0000000..8f4c823
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
+
+ private AtomicInteger currentBookId = new AtomicInteger(0);
+
+ private int staleBookId = 0;
+
+ private RuntimeFilterWritable aggregated = null;
+
+ private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+
+ private AtomicBoolean running = new AtomicBoolean(true);
+
+ private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+ private Thread asyncAggregateThread;
+
+ private BufferAllocator bufferAllocator;
+
+ private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+ public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+ this.bufferAllocator = bufferAllocator;
+ AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+ asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+ asyncAggregateThread.start();
+ }
+
+ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+ if (running.get()) {
+ if (containOne()) {
+ boolean same = aggregated.same(runtimeFilterWritable);
+ if (!same) {
+ //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+ //share the same FragmentContext.
+ try {
+ aggregatedRFLock.lock();
+ aggregated.close();
+ aggregated = null;
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ currentBookId.set(0);
+ staleBookId = 0;
+ clearQueued();
+ }
+ }
+ rfQueue.add(runtimeFilterWritable);
+ } else {
+ runtimeFilterWritable.close();
+ }
+ }
+
+ public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+ try {
+ aggregatedRFLock.lock();
+ return aggregated.duplicate(bufferAllocator);
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ }
+
+ /**
+ * whether there's a fresh aggregated RuntimeFilter
+ *
+ * @return
+ */
+ public boolean hasFreshOne() {
+ if (currentBookId.get() > staleBookId) {
+ staleBookId = currentBookId.get();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * whether there's a usable RuntimeFilter.
+ *
+ * @return
+ */
+ public boolean containOne() {
+ return aggregated != null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ running.compareAndSet(true, false);
+ asyncAggregateThread.interrupt();
+ if (containOne()) {
+ try {
+ aggregatedRFLock.lock();
+ aggregated.close();
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ }
+ clearQueued();
+ }
+
+ private void clearQueued() {
+ RuntimeFilterWritable toClear;
+ while ((toClear = rfQueue.poll()) != null) {
+ toClear.close();
+ }
+ }
+
+ class AsyncAggregateWorker implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ while (running.get()) {
+ RuntimeFilterWritable toAggregate = rfQueue.take();
+ if (!running.get()) {
+ toAggregate.close();
+ return;
+ }
+ if (containOne()) {
+ try {
+ aggregatedRFLock.lock();
+ aggregated.aggregate(toAggregate);
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ } else {
+ aggregated = toAggregate;
+ }
+ currentBookId.incrementAndGet();
+ }
+ } catch (InterruptedException e) {
+ logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+ }
+ }
+ }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 8649e15..302a480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.filter;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData;
import java.util.ArrayList;
@@ -29,7 +30,7 @@ import java.util.List;
* A binary wire transferable representation of the RuntimeFilter which contains
* the runtime filter definition and its corresponding data.
*/
-public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+public class RuntimeFilterWritable implements AutoCloseables.Closeable{
private BitData.RuntimeFilterBDef runtimeFilterBDef;
@@ -81,6 +82,37 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable {
}
}
+ public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+ int len = data.length;
+ DrillBuf[] cloned = new DrillBuf[len];
+ int i = 0;
+ for (DrillBuf src : data) {
+ int capacity = src.readableBytes();
+ DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+ int readerIndex = src.readerIndex();
+ src.readBytes(duplicateOne, 0, capacity);
+ src.readerIndex(readerIndex);
+ cloned[i] = duplicateOne;
+ i++;
+ }
+ return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+ }
+
+ public boolean same(RuntimeFilterWritable other) {
+ BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
+ int otherMajorId = runtimeFilterDef.getMajorFragmentId();
+ int otherMinorId = runtimeFilterDef.getMinorFragmentId();
+ int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
+ int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
+ int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
+ int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
+ return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+ }
+
+ public String toString() {
+ return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+ }
+
@Override
public void close() {
for (DrillBuf buf : data) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 634e832..42b76f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman;
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,7 +62,6 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.filter.RuntimeFilterManager;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
@@ -122,7 +122,7 @@ public class Foreman implements Runnable {
private String queryText;
- private RuntimeFilterManager runtimeFilterManager;
+ private RuntimeFilterRouter runtimeFilterRouter;
private boolean enableRuntimeFilter;
/**
@@ -410,8 +410,8 @@ public class Foreman implements Runnable {
queryRM.visitAbstractPlan(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
if (enableRuntimeFilter) {
- runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext);
- runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo();
+ runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
+ runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
}
if (textPlan != null) {
queryManager.setPlanText(textPlan.value);
@@ -734,8 +734,8 @@ public class Foreman implements Runnable {
logger.debug(queryIdString + ": cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
- if (enableRuntimeFilter && runtimeFilterManager != null) {
- runtimeFilterManager.waitForComplete();
+ if (enableRuntimeFilter && runtimeFilterRouter != null) {
+ runtimeFilterRouter.waitForComplete();
}
// remove the channel disconnected listener (doesn't throw)
closeFuture.removeListener(closeListener);
@@ -866,8 +866,8 @@ public class Foreman implements Runnable {
}
- public RuntimeFilterManager getRuntimeFilterManager() {
- return runtimeFilterManager;
+ public RuntimeFilterRouter getRuntimeFilterRouter() {
+ return runtimeFilterRouter;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index f867015..81d0d1a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.test;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -180,6 +181,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
private ExecutorState executorState = new OperatorFixture.MockExecutorState();
private ExecutionControls controls;
+ private RuntimeFilterSink runtimeFilterSink;
public MockFragmentContext(final DrillConfig config,
final OptionManager options,
@@ -195,6 +197,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
this.controls = new ExecutionControls(options);
compiler = new CodeCompiler(config, options);
bufferManager = new BufferManagerImpl(allocator);
+ this.runtimeFilterSink = new RuntimeFilterSink(allocator);
}
private static FunctionImplementationRegistry newFunctionRegistry(
@@ -315,13 +318,13 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return null;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 1c4779c..559f7f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -203,11 +204,12 @@ public class PhysicalOpUnitTestBase extends ExecTest {
* </p>
*/
protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
- private RuntimeFilterWritable runtimeFilterWritable;
+ private RuntimeFilterSink runtimeFilterSink;
public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
+ this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
}
@Override
@@ -304,13 +306,13 @@ public class PhysicalOpUnitTestBase extends ExecTest {
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterWritable = runtimeFilter;
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ this.runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return runtimeFilterWritable;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
}
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 20abb3b..d7921fc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2518,6 +2518,24 @@ public final class BitData {
*/
com.google.protobuf.ByteString
getProbeFieldsBytes(int index);
+
+ // optional int32 hj_op_id = 7;
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ boolean hasHjOpId();
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ int getHjOpId();
}
/**
* Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2627,6 +2645,11 @@ public final class BitData {
probeFields_.add(input.readBytes());
break;
}
+ case 56: {
+ bitField0_ |= 0x00000010;
+ hjOpId_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2820,6 +2843,30 @@ public final class BitData {
return probeFields_.getByteString(index);
}
+ // optional int32 hj_op_id = 7;
+ public static final int HJ_OP_ID_FIELD_NUMBER = 7;
+ private int hjOpId_;
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public boolean hasHjOpId() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public int getHjOpId() {
+ return hjOpId_;
+ }
+
private void initFields() {
queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
majorFragmentId_ = 0;
@@ -2827,6 +2874,7 @@ public final class BitData {
toForeman_ = false;
bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+ hjOpId_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2858,6 +2906,9 @@ public final class BitData {
for (int i = 0; i < probeFields_.size(); i++) {
output.writeBytes(6, probeFields_.getByteString(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt32(7, hjOpId_);
+ }
getUnknownFields().writeTo(output);
}
@@ -2901,6 +2952,10 @@ public final class BitData {
size += dataSize;
size += 1 * getProbeFieldsList().size();
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(7, hjOpId_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3034,6 +3089,8 @@ public final class BitData {
bitField0_ = (bitField0_ & ~0x00000010);
probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000020);
+ hjOpId_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -3093,6 +3150,10 @@ public final class BitData {
bitField0_ = (bitField0_ & ~0x00000020);
}
result.probeFields_ = probeFields_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.hjOpId_ = hjOpId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3141,6 +3202,9 @@ public final class BitData {
}
onChanged();
}
+ if (other.hasHjOpId()) {
+ setHjOpId(other.getHjOpId());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3595,6 +3659,55 @@ public final class BitData {
return this;
}
+ // optional int32 hj_op_id = 7;
+ private int hjOpId_ ;
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public boolean hasHjOpId() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public int getHjOpId() {
+ return hjOpId_;
+ }
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public Builder setHjOpId(int value) {
+ bitField0_ |= 0x00000040;
+ hjOpId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 hj_op_id = 7;</code>
+ *
+ * <pre>
+ * the operator id of the HashJoin which generates this RuntimeFilter
+ * </pre>
+ */
+ public Builder clearHjOpId() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ hjOpId_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
}
@@ -3648,16 +3761,16 @@ public final class BitData {
" \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
"!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
"f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
- "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" +
+ "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
"&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
"\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
"ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" +
"om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
- "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" +
- "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020",
- "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" +
- "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" +
- "ataH\001"
+ "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType" +
+ "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
+ "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
+ "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
+ "l.exec.protoB\007BitDataH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3687,7 +3800,7 @@ public final class BitData {
internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
- new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", });
+ new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", });
return null;
}
};
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 6aa54dd..3c88ffc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -441,6 +441,8 @@ public final class SchemaBitData
output.writeInt32(5, bloomFilterSizeInBytes, true);
for(String probeFields : message.getProbeFieldsList())
output.writeString(6, probeFields, true);
+ if(message.hasHjOpId())
+ output.writeInt32(7, message.getHjOpId(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
{
@@ -499,6 +501,9 @@ public final class SchemaBitData
case 6:
builder.addProbeFields(input.readString());
break;
+ case 7:
+ builder.setHjOpId(input.readInt32());
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -545,6 +550,7 @@ public final class SchemaBitData
case 4: return "toForeman";
case 5: return "bloomFilterSizeInBytes";
case 6: return "probeFields";
+ case 7: return "hjOpId";
default: return null;
}
}
@@ -562,6 +568,7 @@ public final class SchemaBitData
fieldMap.put("toForeman", 4);
fieldMap.put("bloomFilterSizeInBytes", 5);
fieldMap.put("probeFields", 6);
+ fieldMap.put("hjOpId", 7);
}
}
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 99f3c78..2d1c2a7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -55,6 +55,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
private Boolean toForeman;
private List<Integer> bloomFilterSizeInBytes;
private List<String> probeFields;
+ private int hjOpId;
public RuntimeFilterBDef()
{
@@ -141,6 +142,19 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
return this;
}
+ // hjOpId
+
+ public int getHjOpId()
+ {
+ return hjOpId;
+ }
+
+ public RuntimeFilterBDef setHjOpId(int hjOpId)
+ {
+ this.hjOpId = hjOpId;
+ return this;
+ }
+
// java serialization
public void readExternal(ObjectInput in) throws IOException
@@ -218,6 +232,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
message.probeFields = new ArrayList<String>();
message.probeFields.add(input.readString());
break;
+ case 7:
+ message.hjOpId = input.readInt32();
+ break;
default:
input.handleUnknownField(number, this);
}
@@ -257,6 +274,9 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
output.writeString(6, probeFields, true);
}
}
+
+ if(message.hjOpId != 0)
+ output.writeInt32(7, message.hjOpId, false);
}
public String getFieldName(int number)
@@ -269,6 +289,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
case 4: return "toForeman";
case 5: return "bloomFilterSizeInBytes";
case 6: return "probeFields";
+ case 7: return "hjOpId";
default: return null;
}
}
@@ -288,6 +309,7 @@ public final class RuntimeFilterBDef implements Externalizable, Message<RuntimeF
__fieldMap.put("toForeman", 4);
__fieldMap.put("bloomFilterSizeInBytes", 5);
__fieldMap.put("probeFields", 6);
+ __fieldMap.put("hjOpId", 7);
}
}
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 063efe4..15c7230 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -47,4 +47,5 @@ message RuntimeFilterBDef{
optional bool to_foreman = 4; // true means sending to foreman,false means sending to scan nodes
repeated int32 bloom_filter_size_in_bytes = 5;
repeated string probe_fields = 6; // probe fields with corresponding BloomFilters
+ optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter
}