You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:57 UTC

[11/17] drill git commit: DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.

DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/aaf9fb83
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/aaf9fb83
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/aaf9fb83

Branch: refs/heads/master
Commit: aaf9fb834e02b9a3483758dd6eb475cc781db866
Parents: 4b0b3a6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 20:07:29 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:56 2015 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    | 106 ++++++++++---------
 1 file changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/aaf9fb83/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index ffb76b1..e5e0700 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.fragment;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.DeferredException;
@@ -53,6 +54,7 @@ public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
 
+  private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
   private final String fragmentName;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
@@ -139,25 +141,10 @@ public class FragmentExecutor implements Runnable {
    * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
-    /*
-     * When cancel() is called before run(), root is not initialized and the executor is not
-     * ready to accept external events. So do not wait to change the state.
-     *
-     * For example, consider the case when the Foreman sets up the root fragment executor which is
-     * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
-     * run() method on the root executor will never be called, and the executor will never be ready
-     * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
-     * acceptExternalEvents.
-     */
-    synchronized (this) {
-      if (root != null) {
-        acceptExternalEvents.awaitUninterruptibly();
-      } else {
-        // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
-        // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
-        // fragment will never start running.
-        closeOutResources();
-      }
+    final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true);
+
+    if (!thisIsOnlyThread) {
+      acceptExternalEvents.awaitUninterruptibly();
 
       /*
        * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
@@ -165,14 +152,33 @@ public class FragmentExecutor implements Runnable {
       updateState(FragmentState.CANCELLATION_REQUESTED);
 
       /*
-       * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+       * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+       * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+       * procedure of the main thread.
        */
-      final Thread myThread = myThreadRef.get();
-      if (myThread != null) {
-        logger.debug("Interrupting fragment thread {}", myThread.getName());
-        myThread.interrupt();
+      synchronized (myThreadRef) {
+        final Thread myThread = myThreadRef.get();
+        if (myThread != null) {
+          logger.debug("Interrupting fragment thread {}", myThread.getName());
+          myThread.interrupt();
+        }
       }
+    } else {
+      updateState(FragmentState.CANCELLATION_REQUESTED);
+      cleanup(FragmentState.FINISHED);
     }
+
+  }
+
+  private void cleanup(FragmentState state) {
+
+    closeOutResources();
+
+    updateState(state);
+    // send the final state of the fragment. only the main execution thread can send the final state and it can
+    // only be sent once.
+    sendFinalState();
+
   }
 
   /**
@@ -203,6 +209,11 @@ public class FragmentExecutor implements Runnable {
 
   @Override
   public void run() {
+    // if a cancel thread has already entered this executor, we have not reason to continue.
+    if (!hasCloseoutThread.compareAndSet(false, true)) {
+      return;
+    }
+
     final Thread myThread = Thread.currentThread();
     myThreadRef.set(myThread);
     final String originalThreadName = myThread.getName();
@@ -216,31 +227,24 @@ public class FragmentExecutor implements Runnable {
 
       myThread.setName(newThreadName);
 
-      synchronized (this) {
-        /*
-         * fragmentState might have changed even before this method is called e.g. cancel()
-         */
-        if (shouldContinue()) {
-          // if we didn't get the root operator when the executor was created, create it now.
-          final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
-              drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+      // if we didn't get the root operator when the executor was created, create it now.
+      final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+          drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
 
           root = ImplCreator.getExec(fragmentContext, rootOperator);
           if (root == null) {
             return;
           }
 
-          clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
-          updateState(FragmentState.RUNNING);
+      clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+      updateState(FragmentState.RUNNING);
 
-          acceptExternalEvents.countDown();
+      acceptExternalEvents.countDown();
 
-          final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
-          logger.debug("Starting fragment {}:{} on {}:{}",
-            fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
-            endpoint.getAddress(), endpoint.getUserPort());
-        }
-      }
+      final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+      logger.debug("Starting fragment {}:{} on {}:{}",
+          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+          endpoint.getAddress(), endpoint.getUserPort());
 
       final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
           ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
@@ -275,21 +279,23 @@ public class FragmentExecutor implements Runnable {
       fail(e);
     } finally {
 
+      // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
+      // interruption after we have moved beyond this block.
+      synchronized (myThreadRef) {
+        myThreadRef.set(null);
+        Thread.interrupted();
+      }
+
       // We need to sure we countDown at least once. We'll do it here to guarantee that.
       acceptExternalEvents.countDown();
 
-      closeOutResources();
-
-      updateState(FragmentState.FINISHED);
-      // send the final state of the fragment. only the main execution thread can send the final state and it can
-      // only be sent once.
-      sendFinalState();
+      // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+      cleanup(FragmentState.FINISHED);
 
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       myThread.setName(originalThreadName);
 
-      myThreadRef.set(null);
     }
   }
 
@@ -404,10 +410,10 @@ public class FragmentExecutor implements Runnable {
         errorStateChange(current, target);
       }
 
-    // these should never be requested.
+      // these should never be requested.
+    case CANCELLED:
     case SENDING:
     case AWAITING_ALLOCATION:
-    case CANCELLED:
     default:
       errorStateChange(current, target);
     }