You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:50 UTC
[04/17] drill git commit: DRILL-3072: Update root fragment to not to
modify the Foreman state directly.
DRILL-3072: Update root fragment to not to modify the Foreman state directly.
Instead use RPC mechanism to send and receive status updates
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4dcb3e75
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4dcb3e75
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4dcb3e75
Branch: refs/heads/master
Commit: 4dcb3e75643b71daa7f458e1824ac7eb7fc10cde
Parents: e58a306
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 13 19:42:24 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:52 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/work/foreman/Foreman.java | 9 ++--
.../drill/exec/work/foreman/QueryManager.java | 47 +++++---------------
2 files changed, 17 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 6840cf3..5d07b49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -205,7 +205,7 @@ public class Foreman implements Runnable {
// resume all pauses through query context
queryContext.getExecutionControls().unpauseAll();
// resume all pauses through all fragment contexts
- queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+ queryManager.unpauseExecutingFragments(drillbitContext);
}
/**
@@ -810,7 +810,7 @@ public class Foreman implements Runnable {
assert exception == null;
queryManager.markEndTime();
recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setCompleted(QueryState.CANCELED);
/*
* We don't close the foremanResult until we've gotten
@@ -833,7 +833,7 @@ public class Foreman implements Runnable {
assert exception != null;
queryManager.markEndTime();
recordNewState(QueryState.FAILED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ queryManager.cancelExecutingFragments(drillbitContext);
foremanResult.setFailed(exception);
foremanResult.close();
return;
@@ -934,7 +934,8 @@ public class Foreman implements Runnable {
queryManager.addFragmentStatusTracker(rootFragment, true);
- rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+ rootRunner = new FragmentExecutor(rootContext, rootFragment,
+ queryManager.newRootStatusHandler(rootContext, drillbitContext),
rootOperator);
final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index eed4e17..71b77c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStore;
@@ -53,6 +54,7 @@ import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
import org.apache.drill.exec.work.fragment.StatusReporter;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
@@ -176,17 +178,15 @@ public class QueryManager {
/**
* Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
- * (1) Root fragment
- * (a) If the root is pending, delegate the cancellation to local work bus.
- * (b) If the root is running, cancel the fragment directly.
*
* For the actual cancel calls for intermediate and leaf fragments, see
* {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+ * (1) Root fragment: pending or running, send the cancel signal through a tunnel.
* (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
* fragments). The actual cancel is done by delegating the cancel to the work bus.
* (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
- void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ void cancelExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
switch(data.getState()) {
@@ -194,19 +194,10 @@ public class QueryManager {
case AWAITING_ALLOCATION:
case RUNNING:
final FragmentHandle handle = data.getHandle();
- if (rootRunner.getContext().getHandle().equals(handle)) {
- // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
- final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
- // Case 1.b: running root. Cancel directly.
- if (!removed) {
- rootRunner.cancel();
- }
- } else {
- final DrillbitEndpoint endpoint = data.getEndpoint();
- // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
- controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+ controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.CANCEL), handle);
- }
break;
case FINISHED:
@@ -221,13 +212,9 @@ public class QueryManager {
/**
* Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
- * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
- * control tunnel.
+ * sending any message. Resume all fragments through the control tunnel.
*/
- void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
- if (rootRunner != null) {
- rootRunner.unpause();
- }
+ void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -447,19 +434,9 @@ public class QueryManager {
}
}
- public StatusReporter newRootStatusHandler(final FragmentContext context) {
- return new RootStatusReporter(context);
- }
-
- private class RootStatusReporter extends AbstractStatusReporter {
- private RootStatusReporter(final FragmentContext context) {
- super(context);
- }
-
- @Override
- protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
- fragmentStatusListener.statusUpdate(status);
- }
+ public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) {
+ final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint());
+ return new NonRootStatusReporter(context, tunnel);
}
public FragmentStatusListener getFragmentStatusListener(){