You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/01/12 18:20:45 UTC
[06/11] drill git commit: DRILL-5961: For long running queries (> 10
min) Drill may raise FragmentSetupException for completed/cancelled fragments
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
This closes #1041
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5af3aef
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5af3aef
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5af3aef
Branch: refs/heads/master
Commit: c5af3aefe79c34d5b76bec8ce55875decca9e617
Parents: cbb79e5
Author: Vlad Rozov <vr...@apache.org>
Authored: Tue Nov 14 16:24:01 2017 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Thu Jan 11 17:11:41 2018 -0800
----------------------------------------------------------------------
.../drill/exec/rpc/control/WorkEventBus.java | 91 +++++---------------
.../org/apache/drill/exec/work/WorkManager.java | 4 +-
.../exec/work/batch/ControlMessageHandler.java | 10 +--
.../work/fragment/NonRootFragmentManager.java | 5 --
.../drill/exec/rpc/data/TestBitBitKerberos.java | 3 -
.../apache/drill/exec/rpc/data/TestBitRpc.java | 1 -
6 files changed, 30 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 3e461ef..c889e07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -39,10 +39,6 @@ public class WorkEventBus {
private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap();
private final ConcurrentMap<QueryId, FragmentStatusListener> listeners =
new ConcurrentHashMap<>(16, 0.75f, 16);
- private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
- .maximumSize(10000)
- .expireAfterWrite(10, TimeUnit.MINUTES)
- .build();
public void removeFragmentStatusListener(final QueryId queryId) {
if (logger.isDebugEnabled()) {
@@ -74,83 +70,42 @@ public class WorkEventBus {
public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
- logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
- if (old != null) {
- throw new IllegalStateException(
- "Tried to set fragment manager when has already been set for the provided fragment handle.");
- }
- }
-
- public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
- synchronized (this) {
- return managers.get(handle);
+ if (old != null) {
+ throw new IllegalStateException(
+ String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}
- public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- synchronized (this) {
- // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
- }
- return null;
- }
-
- // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if (m != null) {
- return m;
- }
- }
- throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
- + QueryIdHelper.getQueryIdentifier(handle));
+ public FragmentManager getFragmentManager(final FragmentHandle handle) {
+ return managers.get(handle);
}
/**
- * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
- * multiple times. The manager will be removed only once (the first call).
- * @param handle the handle to the fragment
- */
- public void removeFragmentManager(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager != null) {
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
- } else {
- logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
- }
- }
- }
-
- /**
- * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
- * for fragments waiting on data (root and intermediate).
+ * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
+ * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
+ * only once (the first call).
* @param handle the handle to the fragment
+ * @param cancel
* @return if the fragment was found and removed from the event bus
*/
- public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager == null) {
- return false;
+ public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
+ final FragmentManager manager = managers.remove(handle);
+ if (manager != null) {
+ assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
+ if (cancel) {
+ manager.cancel();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
+ QueryIdHelper.getQueryIdentifier(handle), manager);
}
-
- manager.cancel();
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
return true;
+ } else if (logger.isWarnEnabled()) {
+ logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e935819..d75668c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -325,7 +325,9 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
- workBus.removeFragmentManager(fragmentHandle);
+ if (!fragmentManager.isCancelled()) {
+ workBus.removeFragmentManager(fragmentHandle, false);
+ }
indicateIfSafeToExit();
}
});
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 2bbaf1b..972b56a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -193,7 +193,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
// Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
// work bus.
- final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true);
if (removed) {
return Acks.OK;
}
@@ -217,7 +217,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
private Ack resumeFragment(final FragmentHandle handle) {
// resume a pending fragment
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
if (manager != null) {
manager.unpause();
return Acks.OK;
@@ -237,14 +237,12 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
- final FragmentManager manager =
- bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
- FragmentExecutor executor;
if (manager != null) {
manager.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
- executor = bee.getFragmentRunner(finishedReceiver.getSender());
+ final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender());
if (executor != null) {
executor.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 7d1585b..17a5965 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -57,9 +57,4 @@ public class NonRootFragmentManager extends AbstractFragmentManager {
public void receivingFragmentFinished(final FragmentHandle handle) {
fragmentExecutor.receivingFragmentFinished(handle);
}
-
- @Override
- public synchronized void cancel() {
- super.cancel();
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index a24b0db..834f108 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -225,7 +225,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
new NonStrictExpectations() {{
- workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};
@@ -273,7 +272,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
updateTestCluster(1, newConfig);
new NonStrictExpectations() {{
- workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};
@@ -322,7 +320,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
updateTestCluster(1, newConfig);
new NonStrictExpectations() {{
- workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
workBus.getFragmentManager( (FragmentHandle) any); result = manager;
}};
http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
index 8c53915..1e8318f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
@@ -105,7 +105,6 @@ public class TestBitRpc extends ExecTest {
new NonStrictExpectations() {{
- workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
workBus.getFragmentManager( (FragmentHandle) any); result = fman;
}};