You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/12 10:32:47 UTC
[4/4] drill git commit: DRILL-2977,
DRILL-2978: Swap fragment execution method implementations,
and cancellation changes
DRILL-2977, DRILL-2978: Swap fragment execution method implementations, and cancellation changes
Execution: In WorkManager,
+ swap implementations of startFragmentPendingRemote() and addFragmentRunner()
+ warn if there are running fragments in close()
Cancellation:
+ for fragments waiting on data, delegate cancellations to WorkEventBus (in Foreman and ControlMessageHandler)
+ documentation
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8d577836
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8d577836
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8d577836
Branch: refs/heads/master
Commit: 8d57783623a4ed1691dcebf7b059ad0fd70d8ad7
Parents: eb79e80
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Mon May 11 12:02:35 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon May 11 22:40:06 2015 -0700
----------------------------------------------------------------------
.../drill/exec/rpc/control/WorkEventBus.java | 55 +++++++++++++++++---
.../org/apache/drill/exec/work/WorkManager.java | 45 +++++++++++++---
.../exec/work/batch/ControlMessageHandler.java | 23 +++++---
.../drill/exec/work/foreman/QueryManager.java | 21 ++++++--
.../exec/work/fragment/RootFragmentManager.java | 1 +
5 files changed, 121 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index d90096a..ddd7828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -84,11 +84,13 @@ public class WorkEventBus {
}
public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
- return managers.get(handle);
+ synchronized (this) {
+ return managers.get(handle);
+ }
}
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- // check if this was a recently canceled fragment. If so, throw away message.
+ // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
if (logger.isDebugEnabled()) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
@@ -97,19 +99,58 @@ public class WorkEventBus {
}
// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if(m != null) {
- return m;
+ synchronized (this) {
+ final FragmentManager m = managers.get(handle);
+ if (m != null) {
+ return m;
+ }
}
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
+ QueryIdHelper.getQueryIdentifier(handle));
}
+ /**
+ * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
+ * multiple times. The manager will be removed only once (the first call).
+ * @param handle the handle to the fragment
+ */
public void removeFragmentManager(final FragmentHandle handle) {
if (logger.isDebugEnabled()) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
}
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
+
+ synchronized (this) {
+ final FragmentManager manager = managers.get(handle);
+ if (manager != null) {
+ recentlyFinishedFragments.put(handle, 1);
+ managers.remove(handle);
+ } else {
+ logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
+ }
+ }
+ }
+
+ /**
+ * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
+ * for fragments waiting on data (root and intermediate).
+ * @param handle the handle to the fragment
+ * @return if the fragment was found and removed from the event bus
+ */
+ public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+ }
+
+ synchronized (this) {
+ final FragmentManager manager = managers.get(handle);
+ if (manager == null) {
+ return false;
+ }
+
+ manager.cancel();
+ recentlyFinishedFragments.put(handle, 1);
+ managers.remove(handle);
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1d3a0b0..5939113 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.RpcException;
@@ -178,6 +179,16 @@ public class WorkManager implements AutoCloseable {
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
+
+ if (!runningFragments.isEmpty()) {
+ logger.warn("Closing WorkManager but there are {} running fragments.", runningFragments.size());
+ if (logger.isDebugEnabled()) {
+ for (final FragmentHandle handle : runningFragments.keySet()) {
+ logger.debug("Fragment still running: {} status: {}", QueryIdHelper.getQueryIdentifier(handle),
+ runningFragments.get(handle).getStatus());
+ }
+ }
+ }
}
public DrillbitContext getContext() {
@@ -261,14 +272,10 @@ public class WorkManager implements AutoCloseable {
return dContext;
}
- public void startFragmentPendingRemote(final FragmentManager handler) {
- final FragmentExecutor fragmentExecutor = handler.getRunnable();
- // cancelled fragment managers will return null fragment executors
- if (fragmentExecutor != null) {
- executor.execute(fragmentExecutor);
- }
- }
-
+ /**
+ * Currently used to start a root fragment that is not blocked on data, and leaf fragments.
+ * @param fragmentExecutor the executor to run
+ */
public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
runningFragments.put(fragmentHandle, fragmentExecutor);
@@ -276,6 +283,28 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
+ indicateIfSafeToExit();
+ }
+ });
+ }
+
+ /**
+ * Currently used to start a root fragment that is blocked on data, and intermediate fragments. This method is
+ * called, when the first batch arrives, by {@link org.apache.drill.exec.rpc.data.DataResponseHandlerImpl#handle}
+ * @param fragmentManager the manager for the fragment
+ */
+ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
+ final FragmentHandle fragmentHandle = fragmentManager.getHandle();
+ final FragmentExecutor fragmentExecutor = fragmentManager.getRunnable();
+ if (fragmentExecutor == null) {
+ // the fragment was most likely cancelled
+ return;
+ }
+ runningFragments.put(fragmentHandle, fragmentExecutor);
+ executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
+ @Override
+ protected void cleanup() {
+ runningFragments.remove(fragmentHandle);
workBus.removeFragmentManager(fragmentHandle);
indicateIfSafeToExit();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index d12e6d5..421ad7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -159,21 +159,32 @@ public class ControlMessageHandler {
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
private Ack cancelFragment(final FragmentHandle handle) {
- // cancel a pending fragment
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
- if (manager != null) {
- manager.cancel();
+ /**
+ * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
+ * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and
+ * "inactive" refers to FINISHED, CANCELLATION_REQUESTED, CANCELLED, FAILED
+ */
+
+ // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
+ // work bus.
+ final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ if (removed) {
return Acks.OK;
}
- // cancel a running fragment
+ // Case 3: Cancel active leaf fragment. Such a fragment will be with the worker bee if and only if it is running.
+ // Cancel directly in this case.
final FragmentExecutor runner = bee.getFragmentRunner(handle);
if (runner != null) {
runner.cancel();
return Acks.OK;
}
- // fragment completed or does not exist
+ // Other cases: Fragment completed or does not exist. Currently known cases:
+ // (1) Leaf or intermediate fragment that is inactive: although we should not receive a cancellation
+ // request; it is possible that before the fragment state was updated in the QueryManager, this handler
+ // received a cancel signal.
+ // (2) Unknown fragment.
logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
return Acks.OK;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 090a377..84a38a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
@@ -174,7 +175,16 @@ public class QueryManager {
}
/**
- * Stop all fragments with a currently active status.
+ * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
+ * (1) Root fragment
+ * (a) If the root is pending, delegate the cancellation to local work bus.
+ * (b) If the root is running, cancel the fragment directly.
+ *
+ * For the actual cancel calls for intermediate and leaf fragments, see
+ * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
+ * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
+ * fragments). The actual cancel is done by delegating the cancel to the work bus.
+ * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
final Controller controller = drillbitContext.getController();
@@ -183,11 +193,16 @@ public class QueryManager {
case SENDING:
case AWAITING_ALLOCATION:
case RUNNING:
- if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
+ final FragmentHandle handle = data.getHandle();
+ if (rootRunner.getContext().getHandle().equals(handle)) {
+ // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus.
+ final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ // Case 1.b: running root. Cancel directly.
+ if (!removed) {
rootRunner.cancel();
+ }
} else {
final DrillbitEndpoint endpoint = data.getEndpoint();
- final FragmentHandle handle = data.getHandle();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
SignalListener.Signal.CANCEL), handle);
http://git-wip-us.apache.org/repos/asf/drill/blob/8d577836/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 67ef9b8..b770a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -67,6 +67,7 @@ public class RootFragmentManager implements FragmentManager {
@Override
public void cancel() {
cancel = true;
+ runner.cancel();
}
@Override