You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/16 03:38:44 UTC

[5/7] drill git commit: DRILL-3092: Fix memory leak risk near uses of RecordBatchData classes that need to have their resources freed upon an allocation or other runtime failure.

DRILL-3092: Fix memory leak risk near uses of RecordBatchData classes that need to have their resources freed upon an allocation or other runtime failure.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e838abf8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e838abf8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e838abf8

Branch: refs/heads/master
Commit: e838abf8850726d039de47732eb5a713c8bf51ba
Parents: 30f5892
Author: Jason Altekruse <al...@gmail.com>
Authored: Thu May 14 17:32:09 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 15 17:38:27 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/TopN/TopNBatch.java      | 26 +++++---
 .../exec/physical/impl/join/HashJoinBatch.java  | 22 ++++---
 .../physical/impl/join/NestedLoopJoinBatch.java | 12 +++-
 .../impl/producer/ProducerConsumerBatch.java    |  2 +-
 .../physical/impl/sort/RecordBatchData.java     |  7 +++
 .../physical/impl/xsort/ExternalSortBatch.java  | 62 +++++++++++---------
 6 files changed, 85 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 349f1b1..516b028 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -217,15 +217,23 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           countSincePurge += incoming.getRecordCount();
           batchCount++;
           RecordBatchData batch = new RecordBatchData(incoming);
-          batch.canonicalize();
-          if (priorityQueue == null) {
-            priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
-          }
-          priorityQueue.add(context, batch);
-          if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
-            purge();
-            countSincePurge = 0;
-            batchCount = 0;
+          boolean success = false;
+          try {
+            batch.canonicalize();
+            if (priorityQueue == null) {
+              priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+            }
+            priorityQueue.add(context, batch);
+            if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
+              purge();
+              countSincePurge = 0;
+              batchCount = 0;
+            }
+            success = true;
+          } finally {
+            if (!success) {
+              batch.clear();
+            }
           }
           break;
         default:

http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 6490251..5fd866f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -389,14 +389,22 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                      * records that have matching keys on the probe side.
                      */
         RecordBatchData nextBatch = new RecordBatchData(right);
-        if (hyperContainer == null) {
-          hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
-        } else {
-          hyperContainer.addBatch(nextBatch.getContainer());
-        }
+        boolean success = false;
+        try {
+          if (hyperContainer == null) {
+            hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+          } else {
+            hyperContainer.addBatch(nextBatch.getContainer());
+          }
 
-        // completed processing a batch, increment batch index
-        buildBatchIndex++;
+          // completed processing a batch, increment batch index
+          buildBatchIndex++;
+          success = true;
+        } finally {
+          if (!success) {
+            nextBatch.clear();
+          }
+        }
         break;
       }
       // Get the next record batch

http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 9bcea60..2d37fa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -342,8 +342,16 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
 
   private void addBatchToHyperContainer(RecordBatch inputBatch) {
     RecordBatchData batchCopy = new RecordBatchData(inputBatch);
-    rightCounts.addLast(inputBatch.getRecordCount());
-    rightContainer.addBatch(batchCopy.getContainer());
+    boolean success = false;
+    try {
+      rightCounts.addLast(inputBatch.getRecordCount());
+      rightContainer.addBatch(batchCopy.getContainer());
+      success = true;
+    } finally {
+      if (!success) {
+        batchCopy.clear();
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bca9622..b9a1641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -171,7 +171,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
           }
         }
         if (wrapper!=null) {
-          wrapper.batch.getContainer().zeroVectors();
+          wrapper.batch.clear();
         }
         cleanUpLatch.countDown();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index fbd472e..af774db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -96,4 +96,11 @@ public class RecordBatchData {
     return container;
   }
 
+  public void clear() {
+    if (sv2 != null) {
+      sv2.clear();
+    }
+    container.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index de4a86e..8871a5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -305,36 +305,44 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           sorter.sort(sv2);
 //          logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), count);
           RecordBatchData rbd = new RecordBatchData(incoming);
-          if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) {
-            rbd.setSv2(sv2);
-          }
-          batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
-          batchesSinceLastSpill++;
-          if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
-              (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
-              // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
-              (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
-              // current memory used is more than 95% of memory usage limit of this operator
-              (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
-              // current memory used is more than 95% of memory usage limit of this fragment
-              (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
-              // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
-              // since the last spill exceed the defined limit
-              (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
-
-            if (firstSpillBatchCount == 0) {
-              firstSpillBatchCount = batchGroups.size();
+          boolean success = false;
+          try {
+            if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) {
+              rbd.setSv2(sv2);
             }
-
-            if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
-              logger.info("Merging spills");
-              spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+            batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
+            batchesSinceLastSpill++;
+            if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
+                (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+                // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+                (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
+                // current memory used is more than 95% of memory usage limit of this operator
+                (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+                // current memory used is more than 95% of memory usage limit of this fragment
+                (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
+                // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
+                // since the last spill exceed the defined limit
+                (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+
+              if (firstSpillBatchCount == 0) {
+                firstSpillBatchCount = batchGroups.size();
+              }
+
+              if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+                logger.info("Merging spills");
+                spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+              }
+              spilledBatchGroups.add(mergeAndSpill(batchGroups));
+              batchesSinceLastSpill = 0;
             }
-            spilledBatchGroups.add(mergeAndSpill(batchGroups));
-            batchesSinceLastSpill = 0;
-          }
-          long t = w.elapsed(TimeUnit.MICROSECONDS);
+            long t = w.elapsed(TimeUnit.MICROSECONDS);
 //          logger.debug("Took {} us to sort {} records", t, count);
+            success = true;
+          } finally {
+            if (!success) {
+              rbd.clear();
+            }
+          }
           break;
         case OUT_OF_MEMORY:
           logger.debug("received OUT_OF_MEMORY, trying to spill");