You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/12 01:43:45 UTC
[drill] 01/02: DRILL-6516: Fix memory leak issue with Sort and
StreamingAgg together
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7655ec4f54976def63101daabf34e51697978c57
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Sun Jul 1 00:02:55 2018 -0600
DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together
---
.../impl/xsort/managed/ExternalSortBatch.java | 54 +++++++++++-----------
1 file changed, 28 insertions(+), 26 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea7f51f..7db4d3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -315,7 +315,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
case START:
return load();
case LOAD:
- resetSortState();
+ if (!this.retainInMemoryBatchesOnNone) {
+ resetSortState();
+ }
return (sortState == SortState.DONE) ? NONE : load();
case DELIVER:
return nextOutputBatch();
@@ -578,36 +580,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
if (incoming instanceof ExternalSortBatch) {
ExternalSortBatch esb = (ExternalSortBatch) incoming;
- esb.releaseResources();
+ esb.resetSortState();
}
}
private void releaseResources() {
- // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
- // then release the resources
- if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
- (sortState == SortState.LOAD)) {
-
- // Close the iterator here to release any remaining resources such
- // as spill files. This is important when a query has a join: the
- // first branch sort may complete before the second branch starts;
- // it may be quite a while after returning the last batch before the
- // fragment executor calls this operator's close method.
- //
- // Note however, that the StreamingAgg operator REQUIRES that the sort
- // retain the batches behind an SV4 when doing an in-memory sort because
- // the StreamingAgg retains a reference to that data that it will use
- // after receiving a NONE result code. See DRILL-5656.
- //zeroResources();
- if (resultsIterator != null) {
- resultsIterator.close();
- }
- // We only zero vectors for actual output container
- outputWrapperContainer.clear();
- outputSV4.clear();
- container.zeroVectors();
+ if (resultsIterator != null) {
+ resultsIterator.close();
}
+ // We only zero vectors for actual output container
+ outputWrapperContainer.clear();
+ outputSV4.clear();
+ container.zeroVectors();
+
// Close sortImpl for this boundary
if (sortImpl != null) {
sortImpl.close();
@@ -620,6 +606,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
*/
private void resetSortState() {
sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+ // This means if it has received NONE/EMIT outcome and flag to retain is false which will be the case in presence of
+ // StreamingAggBatch only since it will explicitly call releaseBacthes on ExternalSort when its done consuming
+ // all the data buffer.
+
+ // Close the iterator here to release any remaining resources such
+ // as spill files. This is important when a query has a join: the
+ // first branch sort may complete before the second branch starts;
+ // it may be quite a while after returning the last batch before the
+ // fragment executor calls this operator's close method.
+ //
+ // Note however, that the StreamingAgg operator REQUIRES that the sort
+ // retain the batches behind an SV4 when doing an in-memory sort because
+ // the StreamingAgg retains a reference to that data that it will use
+ // after receiving a NONE result code. See DRILL-5656.
releaseResources();
if (lastKnownOutcome == EMIT) {
@@ -674,7 +674,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
sortState = SortState.DELIVER;
} else if (getRecordCount() == 0) { // There is no record to send downstream
outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
- resetSortState();
+ if (!this.retainInMemoryBatchesOnNone) {
+ resetSortState();
+ }
} else if (lastKnownOutcome == EMIT) {
final boolean hasMoreRecords = outputSV4.hasNext();
sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;