You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:07 UTC

[drill] 03/08: DRILL-6947: Fix RuntimeFilter memory leak

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 1887cce249e79058c7328c2bbf094b3d979e6ab2
Author: weijie.tong <we...@alipay.com>
AuthorDate: Sun Jan 6 15:48:55 2019 +0800

    DRILL-6947: Fix RuntimeFilter memory leak
---
 .../impl/filter/RuntimeFilterRecordBatch.java      | 44 ++++++++++++++++------
 .../exec/physical/impl/join/HashJoinBatch.java     |  8 +++-
 .../exec/work/filter/RuntimeFilterWritable.java    | 21 +++++++++++
 3 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bf7ed79..ac6718c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -224,21 +224,41 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
     setupHashHelper();
     //To make each independent bloom filter work together to construct a final filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
-    for (int i = 0; i < toFilterFields.size(); i++) {
-      BloomFilter bloomFilter = bloomFilters.get(i);
-      String fieldName = toFilterFields.get(i);
-      computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
-    }
+
+    int filterSize = toFilterFields.size();
     int svIndex = 0;
-    for (int i = 0; i < originalRecordCount; i++) {
-      boolean contain = bitSet.get(i);
-      if (contain) {
-        sv2.setIndex(svIndex, i);
-        svIndex++;
-      } else {
-        filteredRows++;
+    if (filterSize == 1) {
+      BloomFilter bloomFilter = bloomFilters.get(0);
+      String fieldName = toFilterFields.get(0);
+      int fieldId = field2id.get(fieldName);
+      for (int rowIndex = 0; rowIndex < originalRecordCount; rowIndex++) {
+        long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+        boolean contain = bloomFilter.find(hash);
+        if (contain) {
+          sv2.setIndex(svIndex, rowIndex);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
+      }
+    } else {
+      for (int i = 0; i < toFilterFields.size(); i++) {
+        BloomFilter bloomFilter = bloomFilters.get(i);
+        String fieldName = toFilterFields.get(i);
+        computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+      }
+      for (int i = 0; i < originalRecordCount; i++) {
+        boolean contain = bitSet.get(i);
+        if (contain) {
+          sv2.setIndex(svIndex, i);
+          svIndex++;
+        } else {
+          filteredRows++;
+        }
       }
     }
+
+
     appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809..30e8af7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -213,6 +213,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
   private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
   private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
   private List<BloomFilter> bloomFilters = new ArrayList<>();
+  private boolean bloomFiltersGenerated = false;
 
   /**
    * This holds information about the spilled partitions for the build and probe side.
@@ -818,8 +819,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
   }
 
+  /**
+   * Note:
+   * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions.
+   */
   private void initializeRuntimeFilter() {
-    if (!enableRuntimeFilter) {
+    if (!enableRuntimeFilter || bloomFiltersGenerated) {
       return;
     }
     runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
@@ -838,6 +843,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         bloomFilter2buildId.put(bloomFilter, buildFieldId);
       }
     }
+    bloomFiltersGenerated = true;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index f8c2701..aebd010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +40,9 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
   private String identifier;
 
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
+    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int bufArrLen = data.length;
+    Preconditions.checkArgument(bfSizeInBytes.size() == bufArrLen, "the input DrillBuf number does not match the metadata definition!");
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
     this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
@@ -46,6 +50,23 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
+  public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf data) {
+    this.runtimeFilterBDef = runtimeFilterBDef;
+    List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
+    int boomFilterNum = bfSizeInBytes.size();
+    this.data = new DrillBuf[boomFilterNum];
+    int index = 0;
+    for (int i = 0; i < boomFilterNum; i++) {
+      int length = bfSizeInBytes.get(i);
+      this.data[i] = data.slice(index, length);
+      index = index + length;
+    }
+
+    this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+                      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+                      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
+  }
+
 
   public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
     return runtimeFilterBDef;