You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/18 00:28:25 UTC

[2/2] drill git commit: DRILL-2252: Cleanup resources when fragment is cancelled.

DRILL-2252: Cleanup resources when fragment is cancelled.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c4d91d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c4d91d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c4d91d0

Branch: refs/heads/master
Commit: 9c4d91d017ba2e7ceea89b5759d61fa73a658153
Parents: ccaabdb
Author: vkorukanti <ve...@gmail.com>
Authored: Sat Feb 14 22:18:31 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:50:33 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/work/fragment/FragmentExecutor.java    | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9c4d91d0/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a8f07b5..7ccb64e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,14 +47,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
   private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
   private final FragmentRoot rootOperator;
-  private RootExec root;
   private final FragmentContext context;
   private final WorkerBee bee;
   private final StatusReporter listener;
-  private Thread executionThread;
-  private AtomicBoolean closed = new AtomicBoolean(false);
   private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
 
+  private RootExec root;
+  private boolean closed;
+
   public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
     this.context = context;
     this.bee = bee;
@@ -99,7 +99,6 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
           context.getHandle().getMinorFragmentId()
           );
       Thread.currentThread().setName(newThreadName);
-      executionThread = Thread.currentThread();
 
       root = ImplCreator.getExec(context, rootOperator);
 
@@ -133,12 +132,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       bee.removeFragment(context.getHandle());
       context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
 
+      // Final check to make sure RecordBatches are cleaned up.
+      closeOutResources(false);
+
       Thread.currentThread().setName(originalThread);
     }
   }
 
   private void closeOutResources(boolean throwFailure) {
-    if (closed.get()) {
+    if (closed) {
       return;
     }
 
@@ -160,7 +162,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       logger.warn("Failure while closing out resources.", e);
     }
 
-    closed.set(true);
+    closed = true;
   }
 
   private void internalFail(Throwable excep) {