You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:50 UTC

[04/17] drill git commit: DRILL-3072: Update root fragment to not to modify the Foreman state directly.

DRILL-3072: Update root fragment to not to modify the Foreman state directly.

Instead use RPC mechanism to send and receive status updates


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4dcb3e75
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4dcb3e75
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4dcb3e75

Branch: refs/heads/master
Commit: 4dcb3e75643b71daa7f458e1824ac7eb7fc10cde
Parents: e58a306
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 13 19:42:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/work/foreman/Foreman.java |  9 ++--
 .../drill/exec/work/foreman/QueryManager.java   | 47 +++++---------------
 2 files changed, 17 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6840cf3..5d07b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -205,7 +205,7 @@ public class Foreman implements Runnable {
     // resume all pauses through query context
     queryContext.getExecutionControls().unpauseAll();
     // resume all pauses through all fragment contexts
-    queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+    queryManager.unpauseExecutingFragments(drillbitContext);
   }
 
   /**
@@ -810,7 +810,7 @@ public class Foreman implements Runnable {
           assert exception == null;
           queryManager.markEndTime();
           recordNewState(QueryState.CANCELLATION_REQUESTED);
-          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          queryManager.cancelExecutingFragments(drillbitContext);
           foremanResult.setCompleted(QueryState.CANCELED);
           /*
            * We don't close the foremanResult until we've gotten
@@ -833,7 +833,7 @@ public class Foreman implements Runnable {
           assert exception != null;
           queryManager.markEndTime();
           recordNewState(QueryState.FAILED);
-          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          queryManager.cancelExecutingFragments(drillbitContext);
           foremanResult.setFailed(exception);
           foremanResult.close();
           return;
@@ -934,7 +934,8 @@ public class Foreman implements Runnable {
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+    rootRunner = new FragmentExecutor(rootContext, rootFragment,
+        queryManager.newRootStatusHandler(rootContext, drillbitContext),
         rootOperator);
     final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index eed4e17..71b77c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStore;
@@ -53,6 +54,7 @@ import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 import org.apache.drill.exec.work.fragment.StatusReporter;
 
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
@@ -176,17 +178,15 @@ public class QueryManager {
 
   /**
    * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
-   * (1) Root fragment
-   *    (a) If the root is pending, delegate the cancellation to local work bus.
-   *    (b) If the root is running, cancel the fragment directly.
    *
    * For the actual cancel calls for intermediate and leaf fragments, see
    * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+   * (1) Root fragment: pending or running, send the cancel signal through a tunnel.
    * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
    *    fragments). The actual cancel is done by delegating the cancel to the work bus.
    * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
    */
-  void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+  void cancelExecutingFragments(final DrillbitContext drillbitContext) {
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       switch(data.getState()) {
@@ -194,19 +194,10 @@ public class QueryManager {
       case AWAITING_ALLOCATION:
       case RUNNING:
         final FragmentHandle handle = data.getHandle();
-        if (rootRunner.getContext().getHandle().equals(handle)) {
-          // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
-          final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
-          // Case 1.b: running root. Cancel directly.
-          if (!removed) {
-            rootRunner.cancel();
-          }
-        } else {
-          final DrillbitEndpoint endpoint = data.getEndpoint();
-          // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
-          controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+        final DrillbitEndpoint endpoint = data.getEndpoint();
+        // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+        controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
             SignalListener.Signal.CANCEL), handle);
-        }
         break;
 
       case FINISHED:
@@ -221,13 +212,9 @@ public class QueryManager {
 
   /**
    * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
-   * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
-   * control tunnel.
+   * sending any message. Resume all fragments through the control tunnel.
    */
-  void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
-    if (rootRunner != null) {
-      rootRunner.unpause();
-    }
+  void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
     final Controller controller = drillbitContext.getController();
     for(final FragmentData data : fragmentDataSet) {
       final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -447,19 +434,9 @@ public class QueryManager {
     }
   }
 
-  public StatusReporter newRootStatusHandler(final FragmentContext context) {
-    return new RootStatusReporter(context);
-  }
-
-  private class RootStatusReporter extends AbstractStatusReporter {
-    private RootStatusReporter(final FragmentContext context) {
-      super(context);
-    }
-
-    @Override
-    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
-      fragmentStatusListener.statusUpdate(status);
-    }
+  public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
+    final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
+    return new NonRootStatusReporter(context, tunnel);
   }
 
   public FragmentStatusListener getFragmentStatusListener(){