You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/12 01:43:45 UTC

[drill] 01/02: DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7655ec4f54976def63101daabf34e51697978c57
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Sun Jul 1 00:02:55 2018 -0600

    DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together
---
 .../impl/xsort/managed/ExternalSortBatch.java      | 54 +++++++++++-----------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea7f51f..7db4d3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -315,7 +315,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     case START:
       return load();
     case LOAD:
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
       return (sortState == SortState.DONE) ? NONE : load();
     case DELIVER:
       return nextOutputBatch();
@@ -578,36 +580,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     if (incoming instanceof ExternalSortBatch) {
       ExternalSortBatch esb = (ExternalSortBatch) incoming;
-      esb.releaseResources();
+      esb.resetSortState();
     }
   }
 
   private void releaseResources() {
-    // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
-    // then release the resources
-    if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
-      (sortState == SortState.LOAD)) {
-
-      // Close the iterator here to release any remaining resources such
-      // as spill files. This is important when a query has a join: the
-      // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last batch before the
-      // fragment executor calls this operator's close method.
-      //
-      // Note however, that the StreamingAgg operator REQUIRES that the sort
-      // retain the batches behind an SV4 when doing an in-memory sort because
-      // the StreamingAgg retains a reference to that data that it will use
-      // after receiving a NONE result code. See DRILL-5656.
-      //zeroResources();
-      if (resultsIterator != null) {
-        resultsIterator.close();
-      }
-      // We only zero vectors for actual output container
-      outputWrapperContainer.clear();
-      outputSV4.clear();
-      container.zeroVectors();
+    if (resultsIterator != null) {
+      resultsIterator.close();
     }
 
+    // We only zero vectors for actual output container
+    outputWrapperContainer.clear();
+    outputSV4.clear();
+    container.zeroVectors();
+
     // Close sortImpl for this boundary
     if (sortImpl != null) {
       sortImpl.close();
@@ -620,6 +606,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    */
   private void resetSortState() {
     sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+    // This means if it has received NONE/EMIT outcome and flag to retain is false which will be the case in presence of
+    // StreamingAggBatch only since it will explicitly call releaseBacthes on ExternalSort when its done consuming
+    // all the data buffer.
+
+    // Close the iterator here to release any remaining resources such
+    // as spill files. This is important when a query has a join: the
+    // first branch sort may complete before the second branch starts;
+    // it may be quite a while after returning the last batch before the
+    // fragment executor calls this operator's close method.
+    //
+    // Note however, that the StreamingAgg operator REQUIRES that the sort
+    // retain the batches behind an SV4 when doing an in-memory sort because
+    // the StreamingAgg retains a reference to that data that it will use
+    // after receiving a NONE result code. See DRILL-5656.
     releaseResources();
 
     if (lastKnownOutcome == EMIT) {
@@ -674,7 +674,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       sortState = SortState.DELIVER;
     } else if (getRecordCount() == 0) { // There is no record to send downstream
       outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
     } else if (lastKnownOutcome == EMIT) {
       final boolean hasMoreRecords = outputSV4.hasNext();
       sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;