You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/01/12 18:20:45 UTC

[06/11] drill git commit: DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments

DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments

This closes #1041


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5af3aef
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5af3aef
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5af3aef

Branch: refs/heads/master
Commit: c5af3aefe79c34d5b76bec8ce55875decca9e617
Parents: cbb79e5
Author: Vlad Rozov <vr...@apache.org>
Authored: Tue Nov 14 16:24:01 2017 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Thu Jan 11 17:11:41 2018 -0800

----------------------------------------------------------------------
 .../drill/exec/rpc/control/WorkEventBus.java    | 91 +++++---------------
 .../org/apache/drill/exec/work/WorkManager.java |  4 +-
 .../exec/work/batch/ControlMessageHandler.java  | 10 +--
 .../work/fragment/NonRootFragmentManager.java   |  5 --
 .../drill/exec/rpc/data/TestBitBitKerberos.java |  3 -
 .../apache/drill/exec/rpc/data/TestBitRpc.java  |  1 -
 6 files changed, 30 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 3e461ef..c889e07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -39,10 +39,6 @@ public class WorkEventBus {
   private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap();
   private final ConcurrentMap<QueryId, FragmentStatusListener> listeners =
       new ConcurrentHashMap<>(16, 0.75f, 16);
-  private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
-          .maximumSize(10000)
-          .expireAfterWrite(10, TimeUnit.MINUTES)
-          .build();
 
   public void removeFragmentStatusListener(final QueryId queryId) {
     if (logger.isDebugEnabled()) {
@@ -74,83 +70,42 @@ public class WorkEventBus {
 
   public void addFragmentManager(final FragmentManager fragmentManager) {
     if (logger.isDebugEnabled()) {
-      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+      logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()), fragmentManager);
     }
     final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
-      if (old != null) {
-        throw new IllegalStateException(
-            "Tried to set fragment manager when has already been set for the provided fragment handle.");
-    }
-  }
-
-  public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
-    synchronized (this) {
-      return managers.get(handle);
+    if (old != null) {
+      throw new IllegalStateException(
+          String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
     }
   }
 
-  public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
-    synchronized (this) {
-      // Check if this was a recently finished (completed or cancelled) fragment.  If so, throw away message.
-      if (recentlyFinishedFragments.asMap().containsKey(handle)) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
-        }
-        return null;
-      }
-
-      // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
-      final FragmentManager m = managers.get(handle);
-      if (m != null) {
-        return m;
-      }
-    }
-    throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
-        + QueryIdHelper.getQueryIdentifier(handle));
+  public FragmentManager getFragmentManager(final FragmentHandle handle) {
+    return managers.get(handle);
   }
 
   /**
-   * Removes fragment manager (for the corresponding the handle) from the work event bus. This method can be called
-   * multiple times. The manager will be removed only once (the first call).
-   * @param handle the handle to the fragment
-   */
-  public void removeFragmentManager(final FragmentHandle handle) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
-    }
-
-    synchronized (this) {
-      final FragmentManager manager = managers.get(handle);
-      if (manager != null) {
-        recentlyFinishedFragments.put(handle, 1);
-        managers.remove(handle);
-      } else {
-        logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
-      }
-    }
-  }
-
-  /**
-   * Cancels and removes fragment manager (for the corresponding the handle) from the work event bus, Currently, used
-   * for fragments waiting on data (root and intermediate).
+   * Optionally cancels and removes fragment manager (for the corresponding the handle) from the work event bus. Currently, used
+   * for fragments waiting on data (root and intermediate). This method can be called multiple times. The manager will be removed
+   * only once (the first call).
    * @param handle the handle to the fragment
+   * @param cancel
    * @return if the fragment was found and removed from the event bus
    */
-  public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
-    }
-
-    synchronized (this) {
-      final FragmentManager manager = managers.get(handle);
-      if (manager == null) {
-        return false;
+  public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel) {
+    final FragmentManager manager = managers.remove(handle);
+    if (manager != null) {
+      assert !manager.isCancelled() : String.format("Fragment {} manager {} is already cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
+      if (cancel) {
+        manager.cancel();
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel and removed" : "Removed",
+            QueryIdHelper.getQueryIdentifier(handle), manager);
       }
-
-      manager.cancel();
-      recentlyFinishedFragments.put(handle, 1);
-      managers.remove(handle);
       return true;
+    } else if (logger.isWarnEnabled()) {
+      logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
     }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e935819..d75668c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -325,7 +325,9 @@ public class WorkManager implements AutoCloseable {
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
-          workBus.removeFragmentManager(fragmentHandle);
+          if (!fragmentManager.isCancelled()) {
+            workBus.removeFragmentManager(fragmentHandle, false);
+          }
           indicateIfSafeToExit();
         }
       });

http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 2bbaf1b..972b56a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -193,7 +193,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
 
     // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
     // work bus.
-    final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+    final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true);
     if (removed) {
       return Acks.OK;
     }
@@ -217,7 +217,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
 
   private Ack resumeFragment(final FragmentHandle handle) {
     // resume a pending fragment
-    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
     if (manager != null) {
       manager.unpause();
       return Acks.OK;
@@ -237,14 +237,12 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
 
   private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
 
-    final FragmentManager manager =
-        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
 
-    FragmentExecutor executor;
     if (manager != null) {
       manager.receivingFragmentFinished(finishedReceiver.getReceiver());
     } else {
-      executor = bee.getFragmentRunner(finishedReceiver.getSender());
+      final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender());
       if (executor != null) {
         executor.receivingFragmentFinished(finishedReceiver.getReceiver());
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 7d1585b..17a5965 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -57,9 +57,4 @@ public class NonRootFragmentManager extends AbstractFragmentManager {
   public void receivingFragmentFinished(final FragmentHandle handle) {
     fragmentExecutor.receivingFragmentFinished(handle);
   }
-
-  @Override
-  public synchronized void cancel() {
-    super.cancel();
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index a24b0db..834f108 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -225,7 +225,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
   public void success(@Injectable WorkerBee bee, @Injectable final WorkEventBus workBus) throws Exception {
 
     new NonStrictExpectations() {{
-      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
       workBus.getFragmentManager( (FragmentHandle) any); result = manager;
     }};
 
@@ -273,7 +272,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
     updateTestCluster(1, newConfig);
 
     new NonStrictExpectations() {{
-      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
       workBus.getFragmentManager( (FragmentHandle) any); result = manager;
     }};
 
@@ -322,7 +320,6 @@ public class TestBitBitKerberos extends BaseTestQuery {
     updateTestCluster(1, newConfig);
 
     new NonStrictExpectations() {{
-      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = manager;
       workBus.getFragmentManager( (FragmentHandle) any); result = manager;
     }};
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c5af3aef/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
index 8c53915..1e8318f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java
@@ -105,7 +105,6 @@ public class TestBitRpc extends ExecTest {
 
 
     new NonStrictExpectations() {{
-      workBus.getFragmentManagerIfExists((FragmentHandle) any); result = fman;
       workBus.getFragmentManager( (FragmentHandle) any); result = fman;
     }};