You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:07 UTC
[drill] 03/08: DRILL-6947: Fix RuntimeFilter memory leak
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 1887cce249e79058c7328c2bbf094b3d979e6ab2
Author: weijie.tong <we...@alipay.com>
AuthorDate: Sun Jan 6 15:48:55 2019 +0800
DRILL-6947: Fix RuntimeFilter memory leak
---
.../impl/filter/RuntimeFilterRecordBatch.java | 44 ++++++++++++++++------
.../exec/physical/impl/join/HashJoinBatch.java | 8 +++-
.../exec/work/filter/RuntimeFilterWritable.java | 21 +++++++++++
3 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bf7ed79..ac6718c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -224,21 +224,41 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
setupHashHelper();
//To make each independent bloom filter work together to construct a final filter result: BitSet.
BitSet bitSet = new BitSet(originalRecordCount);
- for (int i = 0; i < toFilterFields.size(); i++) {
- BloomFilter bloomFilter = bloomFilters.get(i);
- String fieldName = toFilterFields.get(i);
- computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
- }
+
+ int filterSize = toFilterFields.size();
int svIndex = 0;
- for (int i = 0; i < originalRecordCount; i++) {
- boolean contain = bitSet.get(i);
- if (contain) {
- sv2.setIndex(svIndex, i);
- svIndex++;
- } else {
- filteredRows++;
+ if (filterSize == 1) {
+ BloomFilter bloomFilter = bloomFilters.get(0);
+ String fieldName = toFilterFields.get(0);
+ int fieldId = field2id.get(fieldName);
+ for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
+ long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+ boolean contain = bloomFilter.find(hash);
+ if (contain) {
+ sv2.setIndex(svIndex, rowIndex);
+ svIndex++;
+ } else {
+ filteredRows++;
+ }
+ }
+ } else {
+ for (int i = 0; i < toFilterFields.size(); i++) {
+ BloomFilter bloomFilter = bloomFilters.get(i);
+ String fieldName = toFilterFields.get(i);
+ computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+ }
+ for (int i = 0; i < originalRecordCount; i++) {
+ boolean contain = bitSet.get(i);
+ if (contain) {
+ sv2.setIndex(svIndex, i);
+ svIndex++;
+ } else {
+ filteredRows++;
+ }
}
}
+
+
appliedTimes++;
sv2.setRecordCount(svIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809..30e8af7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -213,6 +213,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
private List<BloomFilter> bloomFilters = new ArrayList<>();
+ private boolean bloomFiltersGenerated = false;
/**
* This holds information about the spilled partitions for the build and probe side.
@@ -818,8 +819,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
}
+ /**
+ * Note:
+ * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions.
+ */
private void initializeRuntimeFilter() {
- if (!enableRuntimeFilter) {
+ if (!enableRuntimeFilter || bloomFiltersGenerated) {
return;
}
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
@@ -838,6 +843,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
bloomFilter2buildId.put(bloomFilter, buildFieldId);
}
}
+ bloomFiltersGenerated = true;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index f8c2701..aebd010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
@@ -39,6 +40,9 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
private String identifier;
public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
+ List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+ int bufArrLen = data.length;
+ Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input DrillBuf number does not match the metadata definition!");
this.runtimeFilterBDef = runtimeFilterBDef;
this.data = data;
this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
@@ -46,6 +50,23 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
+ ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
}
+ public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) {
+ this.runtimeFilterBDef = runtimeFilterBDef;
+ List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+ int boomFilterNum = bfSizeInBytes.size();
+ this.data = new DrillBuf[boomFilterNum];
+ int index = 0;
+ for (int i = 0; i < boomFilterNum; i++) {
+ int length = bfSizeInBytes.get(i);
+ this.data[i] = data.slice(index, length);
+ index = index + length;
+ }
+
+ this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+ + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+ + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
+ }
+
public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
return runtimeFilterBDef;