You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/12 10:32:47 UTC

[4/4] drill git commit: DRILL-2977, DRILL-2978: Swap fragment execution method implementations, and cancellation changes

DRILL-2977, DRILL-2978: Swap fragment execution method implementations, and cancellation changes

Execution: In WorkManager,
+ swap implementations of startFragmentPendingRemote() and addFragmentRunner()
+ warn if there are running fragments in close()

Cancellation:
+ for fragments waiting on data, delegate cancellations to WorkEventBus (in Foreman and ControlMessageHandler)
+ documentation


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8d577836
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8d577836
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8d577836

Branch: refs/heads/master
Commit: 8d57783623a4ed1691dcebf7b059ad0fd70d8ad7
Parents: eb79e80
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Mon May 11 12:02:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/rpc/control/WorkEventBus.java    | 55 +++++++++++++++++---
 .../org/apache/drill/exec/work/WorkManager.java | 45 +++++++++++++---
 .../exec/work/batch/ControlMessageHandler.java  | 23 +++++---
 .../drill/exec/work/foreman/QueryManager.java   | 21 ++++++--
 .../exec/work/fragment/RootFragmentManager.java |  1 +
 5 files changed, 121 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index d90096a..ddd7828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -84,11 +84,13 @@ public class WorkEventBus {
   }
 
   public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
-    return managers.get(handle);
+    synchronized (this) {
+      return managers.get(handle);
+    }
   }
 
   public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
-    // check if this was a recently canceled fragment.  If so, throw away message.
+    // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
     if (recentlyFinishedFragments.asMap().containsKey(handle)) {
       if (logger.isDebugEnabled()) {
         logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
@@ -97,19 +99,58 @@ public class WorkEventBus {
     }
 
     // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
-    final FragmentManager m = managers.get(handle);
-    if(m != null) {
-      return m;
+    synchronized (this) {
+      final FragmentManager m = managers.get(handle);
+      if (m != null) {
+        return m;
+      }
     }
     throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
         + QueryIdHelper.getQueryIdentifier(handle));
   }
 
+  /**
+   * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
+   * multiple times. The manager will be removed only once (the first call).
+   * @param handle the handle to the fragment
+   */
   public void removeFragmentManager(final FragmentHandle handle) {
     if (logger.isDebugEnabled()) {
       logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
     }
-    recentlyFinishedFragments.put(handle,  1);
-    managers.remove(handle);
+
+    synchronized (this) {
+      final FragmentManager manager = managers.get(handle);
+      if (manager != null) {
+        recentlyFinishedFragments.put(handle, 1);
+        managers.remove(handle);
+      } else {
+        logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
+      }
+    }
+  }
+
+  /**
+   * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
+   * for fragments waiting on data (root and intermediate).
+   * @param handle the handle to the fragment
+   * @return if the fragment was found and removed from the event bus
+   */
+  public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    }
+
+    synchronized (this) {
+      final FragmentManager manager = managers.get(handle);
+      if (manager == null) {
+        return false;
+      }
+
+      manager.cancel();
+      recentlyFinishedFragments.put(handle, 1);
+      managers.remove(handle);
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1d3a0b0..5939113 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.RpcException;
@@ -178,6 +179,16 @@ public class WorkManager implements AutoCloseable {
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
     }
+
+    if (!runningFragments.isEmpty()) {
+      logger.warn("Closing WorkManager but there are {} running fragments.", runningFragments.size());
+      if (logger.isDebugEnabled()) {
+        for (final FragmentHandle handle : runningFragments.keySet()) {
+          logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(handle),
+            runningFragments.get(handle).getStatus());
+        }
+      }
+    }
   }
 
   public DrillbitContext getContext() {
@@ -261,14 +272,10 @@ public class WorkManager implements AutoCloseable {
       return dContext;
     }
 
-    public void startFragmentPendingRemote(final FragmentManager handler) {
-      final FragmentExecutor fragmentExecutor = handler.getRunnable();
-      // cancelled fragment managers will return null fragment executors
-      if (fragmentExecutor != null) {
-        executor.execute(fragmentExecutor);
-      }
-    }
-
+    /**
+     * Currently used to start a root fragment that is not blocked on data, and leaf fragments.
+     * @param fragmentExecutor the executor to run
+     */
     public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
       final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
       runningFragments.put(fragmentHandle, fragmentExecutor);
@@ -276,6 +283,28 @@ public class WorkManager implements AutoCloseable {
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
+          indicateIfSafeToExit();
+        }
+      });
+    }
+
+    /**
+     * Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
+     * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataResponseHandlerImpl#handle}
+     * @param fragmentManager the manager for the fragment
+     */
+    public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
+      final FragmentHandle fragmentHandle = fragmentManager.getHandle();
+      final FragmentExecutor fragmentExecutor = fragmentManager.getRunnable();
+      if (fragmentExecutor == null) {
+        // the fragment was most likely cancelled
+        return;
+      }
+      runningFragments.put(fragmentHandle, fragmentExecutor);
+      executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
+        @Override
+        protected void cleanup() {
+          runningFragments.remove(fragmentHandle);
           workBus.removeFragmentManager(fragmentHandle);
           indicateIfSafeToExit();
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index d12e6d5..421ad7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -159,21 +159,32 @@ public class ControlMessageHandler {
    * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
   private Ack cancelFragment(final FragmentHandle handle) {
-    // cancel a pending fragment
-    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
-    if (manager != null) {
-      manager.cancel();
+    /**
+     * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
+     * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and
+     * "inactive" refers to FINISHED, CANCELLATION_REQUESTED, CANCELLED, FAILED
+     */
+
+    // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
+    // work bus.
+    final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+    if (removed) {
       return Acks.OK;
     }
 
-    // cancel a running fragment
+    // Case 3: Cancel active leaf fragment. Such a fragment will be with the worker bee if and only if it is running.
+    // Cancel directly in this case.
     final FragmentExecutor runner = bee.getFragmentRunner(handle);
     if (runner != null) {
       runner.cancel();
       return Acks.OK;
     }
 
-    // fragment completed or does not exist
+    // Other cases: Fragment completed or does not exist. Currently known cases:
+    // (1) Leaf or intermediate fragment that is inactive: although we should not receive a cancellation
+    //     request; it is possible that before the fragment state was updated in the QueryManager, this handler
+    //     received a cancel signal.
+    // (2) Unknown fragment.
     logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
     return Acks.OK;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 090a377..84a38a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.store.sys.PStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
@@ -174,7 +175,16 @@ public class QueryManager {
   }
 
   /**
-   * Stop all fragments with a currently active status.
+   * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
+   * (1) Root fragment
+   *    (a) If the root is pending, delegate the cancellation to local work bus.
+   *    (b) If the root is running, cancel the fragment directly.
+   *
+   * For the actual cancel calls for intermediate and leaf fragments, see
+   * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+   * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
+   *    fragments). The actual cancel is done by delegating the cancel to the work bus.
+   * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
    */
   void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
     final Controller controller = drillbitContext.getController();
@@ -183,11 +193,16 @@ public class QueryManager {
       case SENDING:
       case AWAITING_ALLOCATION:
       case RUNNING:
-        if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
+        final FragmentHandle handle = data.getHandle();
+        if (rootRunner.getContext().getHandle().equals(handle)) {
+          // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
+          final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+          // Case 1.b: running root. Cancel directly.
+          if (!removed) {
             rootRunner.cancel();
+          }
         } else {
           final DrillbitEndpoint endpoint = data.getEndpoint();
-          final FragmentHandle handle = data.getHandle();
           // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
           controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
             SignalListener.Signal.CANCEL), handle);

http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 67ef9b8..b770a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -67,6 +67,7 @@ public class RootFragmentManager implements FragmentManager {
   @Override
   public void cancel() {
     cancel = true;
+    runner.cancel();
   }
 
   @Override