You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:08 UTC
[drill] 04/08: DRILL-6999: Fix the case that there's more than one
join conditions
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4e03d54cd854c08a5ed96a67e7c27f02fa5ff435
Author: weijie.tong <we...@alipay.com>
AuthorDate: Thu Jan 24 22:06:40 2019 +0800
DRILL-6999: Fix the case that there's more than one join conditions
closes #1600
---
.../drill/exec/rpc/data/DataServerRequestHandler.java | 13 ++++++++++++-
.../drill/exec/work/filter/RuntimeFilterWritable.java | 18 ------------------
2 files changed, 12 insertions(+), 19 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
index e60fcae..5ad7ba4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.fragment.FragmentManager;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
// package private
@@ -106,7 +107,17 @@ class DataServerRequestHandler implements RequestHandler<DataServerConnection> {
if (dBody == null) {
return;
}
- RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, (DrillBuf) dBody);
+ List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+ int boomFilterNum = bfSizeInBytes.size();
+ DrillBuf data = (DrillBuf) dBody;
+ DrillBuf[] bufs = new DrillBuf[boomFilterNum];
+ int index = 0;
+ for (int i = 0; i < boomFilterNum; i++) {
+ int length = bfSizeInBytes.get(i);
+ bufs[i] = data.slice(index, length);
+ index = index + length;
+ }
+ RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, bufs);
AckSender ackSender = new AckSender(sender);
ackSender.increment();
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index aebd010..566781b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -50,24 +50,6 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
+ ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
}
- public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) {
- this.runtimeFilterBDef = runtimeFilterBDef;
- List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
- int boomFilterNum = bfSizeInBytes.size();
- this.data = new DrillBuf[boomFilterNum];
- int index = 0;
- for (int i = 0; i < boomFilterNum; i++) {
- int length = bfSizeInBytes.get(i);
- this.data[i] = data.slice(index, length);
- index = index + length;
- }
-
- this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
- + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
- + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
- }
-
-
public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
return runtimeFilterBDef;
}