You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/18 00:28:25 UTC
[2/2] drill git commit: DRILL-2252: Cleanup resources when fragment
is cancelled.
DRILL-2252: Cleanup resources when fragment is cancelled.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c4d91d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c4d91d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c4d91d0
Branch: refs/heads/master
Commit: 9c4d91d017ba2e7ceea89b5759d61fa73a658153
Parents: ccaabdb
Author: vkorukanti <ve...@gmail.com>
Authored: Sat Feb 14 22:18:31 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:50:33 2015 -0800
----------------------------------------------------------------------
.../drill/exec/work/fragment/FragmentExecutor.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9c4d91d0/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a8f07b5..7ccb64e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,14 +47,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
private final FragmentRoot rootOperator;
- private RootExec root;
private final FragmentContext context;
private final WorkerBee bee;
private final StatusReporter listener;
- private Thread executionThread;
- private AtomicBoolean closed = new AtomicBoolean(false);
private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
+ private RootExec root;
+ private boolean closed;
+
public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
this.context = context;
this.bee = bee;
@@ -99,7 +99,6 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
context.getHandle().getMinorFragmentId()
);
Thread.currentThread().setName(newThreadName);
- executionThread = Thread.currentThread();
root = ImplCreator.getExec(context, rootOperator);
@@ -133,12 +132,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
bee.removeFragment(context.getHandle());
context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
+ // Final check to make sure RecordBatches are cleaned up.
+ closeOutResources(false);
+
Thread.currentThread().setName(originalThread);
}
}
private void closeOutResources(boolean throwFailure) {
- if (closed.get()) {
+ if (closed) {
return;
}
@@ -160,7 +162,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
logger.warn("Failure while closing out resources.", e);
}
- closed.set(true);
+ closed = true;
}
private void internalFail(Throwable excep) {