You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vrozov <gi...@git.apache.org> on 2017/11/17 01:33:39 UTC
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
GitHub user vrozov opened a pull request:
https://github.com/apache/drill/pull/1041
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/vrozov/drill DRILL-5961
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/drill/pull/1041.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1041
----
commit f7e0c44c1277cc3fa0cdf466e01521974c98262d
Author: Vlad Rozov <vr...@apache.org>
Date: 2017-11-15T00:24:01Z
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
commit 575dec0fdc0dc4efb50569afa568c06f21546e6e
Author: Vlad Rozov <vr...@apache.org>
Date: 2017-11-15T00:35:09Z
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
----
---
[GitHub] drill issue #1041: DRILL-5961: For long running queries (> 10 min) Drill may...
Posted by priteshm <gi...@git.apache.org>.
Github user priteshm commented on the issue:
https://github.com/apache/drill/pull/1041
@parthchandra or @adityakishore since you'll have made changes to these files before, can you review the changes here?
---
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/drill/pull/1041
---
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1041#discussion_r155433245
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---
@@ -277,7 +277,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
- workBus.removeFragmentManager(fragmentHandle);
+ if (!fragmentManager.isCancelled()) {
+ workBus.removeFragmentManager(fragmentHandle, false);
--- End diff --
If cleanup is not called as a result of FragmentManager cancellation, it is part of the regular cleanup after run is complete.
---
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/1041#discussion_r155397359
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---
@@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
- logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
- if (old != null) {
- throw new IllegalStateException(
- "Tried to set fragment manager when has already been set for the provided fragment handle.");
- }
- }
-
- public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
- synchronized (this) {
- return managers.get(handle);
+ if (old != null) {
+ throw new IllegalStateException(
+ String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}
- public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- synchronized (this) {
- // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
- }
- return null;
- }
-
- // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if (m != null) {
- return m;
- }
- }
- throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
- + QueryIdHelper.getQueryIdentifier(handle));
+ public FragmentManager getFragmentManager(final FragmentHandle handle) {
+ return managers.get(handle);
}
/**
- * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
- * multiple times. The manager will be removed only once (the first call).
- * @param handle the handle to the fragment
- */
- public void removeFragmentManager(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager != null) {
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
- } else {
- logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
- }
- }
- }
-
- /**
- * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
- * for fragments waiting on data (root and intermediate).
+ * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
+ * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
+ * only once (the first call).
* @param handle the handle to the fragment
+ * @param cancel
* @return if the fragment was found and removed from the event bus
*/
- public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager == null) {
- return false;
+ public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
+ final FragmentManager manager = managers.remove(handle);
+ if (manager != null) {
+ assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
+ if (cancel) {
+ manager.cancel();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
+ QueryIdHelper.getQueryIdentifier(handle), manager);
}
-
- manager.cancel();
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
return true;
+ } else if (logger.isWarnEnabled()) {
+ logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
+ return false;
--- End diff --
Seems like this is redundant. Doesn't seem like we ever return false, and the callers never seem to check.
---
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:
https://github.com/apache/drill/pull/1041#discussion_r155433860
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---
@@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
public void addFragmentManager(final FragmentManager fragmentManager) {
if (logger.isDebugEnabled()) {
- logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
}
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
- if (old != null) {
- throw new IllegalStateException(
- "Tried to set fragment manager when has already been set for the provided fragment handle.");
- }
- }
-
- public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
- synchronized (this) {
- return managers.get(handle);
+ if (old != null) {
+ throw new IllegalStateException(
+ String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
}
}
- public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- synchronized (this) {
- // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
- }
- return null;
- }
-
- // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- final FragmentManager m = managers.get(handle);
- if (m != null) {
- return m;
- }
- }
- throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
- + QueryIdHelper.getQueryIdentifier(handle));
+ public FragmentManager getFragmentManager(final FragmentHandle handle) {
+ return managers.get(handle);
}
/**
- * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
- * multiple times. The manager will be removed only once (the first call).
- * @param handle the handle to the fragment
- */
- public void removeFragmentManager(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager != null) {
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
- } else {
- logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
- }
- }
- }
-
- /**
- * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
- * for fragments waiting on data (root and intermediate).
+ * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
+ * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
+ * only once (the first call).
* @param handle the handle to the fragment
+ * @param cancel
* @return if the fragment was found and removed from the event bus
*/
- public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
- if (logger.isDebugEnabled()) {
- logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
- }
-
- synchronized (this) {
- final FragmentManager manager = managers.get(handle);
- if (manager == null) {
- return false;
+ public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
+ final FragmentManager manager = managers.remove(handle);
+ if (manager != null) {
+ assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
+ if (cancel) {
+ manager.cancel();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
+ QueryIdHelper.getQueryIdentifier(handle), manager);
}
-
- manager.cancel();
- recentlyFinishedFragments.put(handle, 1);
- managers.remove(handle);
return true;
+ } else if (logger.isWarnEnabled()) {
+ logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
}
+ return false;
--- End diff --
ControlMessageHandler checks the result on line 196.
---
[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/1041#discussion_r155396889
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---
@@ -277,7 +277,9 @@ public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
- workBus.removeFragmentManager(fragmentHandle);
+ if (!fragmentManager.isCancelled()) {
+ workBus.removeFragmentManager(fragmentHandle, false);
--- End diff --
Not sure why you don't want to cancel here.
---