You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/01/15 23:48:34 UTC

drill git commit: DRILL-2004: Foreman should account for fragment cancellations or query hangs

Repository: drill
Updated Branches:
  refs/heads/master 69db15ebb -> 937802814


DRILL-2004: Foreman should account for fragment cancellations or query hangs


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/93780281
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/93780281
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/93780281

Branch: refs/heads/master
Commit: 93780281439d8c0a4949dfed054ad34225970665
Parents: 69db15e
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Tue Jan 13 18:37:12 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Thu Jan 15 14:48:13 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/rpc/control/WorkEventBus.java    | 14 +---
 .../apache/drill/exec/work/foreman/Foreman.java | 14 ++--
 .../drill/exec/work/foreman/QueryManager.java   | 13 ++--
 .../exec/work/fragment/FragmentExecutor.java    | 79 ++++++++++++++------
 .../work/fragment/StateTransitionException.java | 42 +++++++++++
 5 files changed, 118 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/93780281/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index b9f0a26..d6b8637 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -54,7 +54,7 @@ public class WorkEventBus {
   }
 
   public void removeFragmentStatusListener(QueryId queryId) {
-    logger.debug("Removing framgent status listener for queryId {}.", queryId);
+    logger.debug("Removing fragment status listener for queryId {}.", queryId);
     listeners.remove(queryId);
   }
 
@@ -70,10 +70,7 @@ public class WorkEventBus {
   public void status(FragmentStatus status) {
     FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
     if (l == null) {
-
-      logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.",
-          status.getHandle());
-      return;
+      logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status);
     } else {
       l.statusUpdate(status);
     }
@@ -102,17 +99,12 @@ public class WorkEventBus {
 
     // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
     FragmentManager m = managers.get(handle);
-    if(m != null){
+    if(m != null) {
       return m;
     }
     throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle));
   }
 
-  public void cancelFragment(FragmentHandle handle) {
-    logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle));
-    removeFragmentManager(handle);
-  }
-
   public void removeFragmentManager(FragmentHandle handle) {
     logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
     recentlyFinishedFragments.put(handle,  1);

http://git-wip-us.apache.org/repos/asf/drill/blob/93780281/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5efc9fa..e10a6aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -155,6 +155,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
   }
 
   private void cleanup(QueryResult result) {
+    logger.info("foreman cleaning up - status: {}", queryManager.getStatus());
+
     bee.retireForeman(this);
     context.getWorkBus().removeFragmentStatusListener(queryId);
     context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
@@ -384,7 +386,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
    * @return
    */
   private synchronized boolean moveToState(QueryState newState, Exception exception){
-    logger.debug("State change requested.  {} --> {}", state, newState);
+    logger.info("State change requested.  {} --> {}", state, newState, exception);
     outside: switch(state) {
 
     case PENDING:
@@ -413,7 +415,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
             .setIsLastChunk(true) //
             .build();
 
-        cleanup(result);
+        // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
+        // a terminal state(completed, failed)
+        initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
         return true;
       }
 
@@ -454,7 +458,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
     case COMPLETED:
     case FAILED: {
       // no op.
-      logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
+      logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
       return false;
     }
 
@@ -635,9 +639,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
 
   public class StateListener {
     public boolean moveToState(QueryState newState, Exception ex){
-      try{
+      try {
         acceptExternalEvents.await();
-      }catch(InterruptedException e){
+      } catch(InterruptedException e){
         logger.warn("Interrupted while waiting to move state.", e);
         return false;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/93780281/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index d4c87d4..2de3592 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -21,11 +21,13 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -51,7 +53,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   private final QueryId queryId;
 
   public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
-    super();
     this.stateListener = stateListener;
     this.queryId =  id;
     this.remainingFragmentCount = new AtomicInteger(0);
@@ -87,13 +88,14 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       updateFragmentStatus(status);
       break;
     case CANCELLED:
-      // we don't care about cancellation messages since we're the only entity that should drive cancellations.
+      //TODO: define a new query state to distinguish the state of early termination from cancellation
+      fragmentDone(status);
       break;
     case FAILED:
       stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
       break;
     case FINISHED:
-      finished(status);
+      fragmentDone(status);
       break;
     case RUNNING:
       updateFragmentStatus(status);
@@ -107,11 +109,11 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     this.status.updateFragmentStatus(status);
   }
 
-  private void finished(FragmentStatus status){
+  private void fragmentDone(FragmentStatus status){
     this.status.incrementFinishedFragments();
     int remaining = remainingFragmentCount.decrementAndGet();
     updateFragmentStatus(status);
-
+    logger.debug("waiting for {} fragments", remaining);
     if(remaining == 0){
       stateListener.moveToState(QueryState.COMPLETED, null);
     }
@@ -119,6 +121,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
 
   public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
     remainingFragmentCount.set(countOfNonRootFragments + 1);
+    logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
     status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
     this.status.setTotalFragments(countOfNonRootFragments + 1);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/93780281/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 27038d3..9ffe643 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
-import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -71,16 +70,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     updateState(FragmentState.CANCELLED);
     logger.debug("Cancelled Fragment {}", context.getHandle());
     context.cancel();
-
-    if (executionThread != null) {
-      executionThread.interrupt();
-    }
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
-    updateState(FragmentState.CANCELLED);
-    context.cancel();
-    root.receivingFragmentFinished(handle);
+    cancel();
+    if (root != null) {
+      root.receivingFragmentFinished(handle);
+    }
   }
 
   public UserClientConnection getClient() {
@@ -104,8 +100,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);
 
       logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
-      if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) {
-        internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+      if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
+        logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
         return;
       }
 
@@ -117,7 +113,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
             closeOutResources(false);
           } else {
             closeOutResources(true); // make sure to close out resources before we report success.
-            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+            updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
           }
 
           break;
@@ -166,23 +162,60 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     listener.fail(context.getHandle(), "Failure while running fragment.", excep);
   }
 
-  private void updateState(FragmentState update) {
-    state.set(update.getNumber());
-    listener.stateChanged(context.getHandle(), update);
+  /**
+   * Updates the fragment state with the given state
+   * @param to target state
+   */
+  protected void updateState(FragmentState to) {;
+    state.set(to.getNumber());
+    listener.stateChanged(context.getHandle(), to);
   }
 
-  private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
-    boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
-    if (!success && exceptionOnFailure) {
-      internalFail(new RuntimeException(String.format(
-          "State was different than expected.  Attempting to update state from %s to %s however current state was %s.",
-          current.name(), update.name(), FragmentState.valueOf(state.get()))));
-      return false;
+  /**
+   * Updates the fragment state only if the current state matches the expected.
+   *
+   * @param expected expected current state
+   * @param to target state
+   * @return true only if update succeeds
+   */
+  protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) {
+    boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
+    if (success) {
+      listener.stateChanged(context.getHandle(), to);
+    } else {
+      logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
+          expected.name(), to.name(), FragmentState.valueOf(state.get()));
     }
-    listener.stateChanged(context.getHandle(), update);
-    return true;
+    return success;
   }
 
+  /**
+   * Returns true if the fragment is in a terminal state
+   */
+  protected boolean isCompleted() {
+    return state.get() == FragmentState.CANCELLED_VALUE
+        || state.get() == FragmentState.FAILED_VALUE
+        || state.get() == FragmentState.FINISHED_VALUE;
+  }
+
+  /**
+   * Update the state if current state matches expected or fail the fragment if state transition fails even though
+   * fragment is not in a terminal state.
+   *
+   * @param expected current expected state
+   * @param to target state
+   * @return true only if update succeeds
+   */
+  protected boolean updateStateOrFail(FragmentState expected, FragmentState to) {
+    final boolean updated = checkAndUpdateState(expected, to);
+    if (!updated && !isCompleted()) {
+      final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
+      internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
+    }
+    return updated;
+  }
+
+
   @Override
   public int compareTo(Object o) {
     return o.hashCode() - this.hashCode();

http://git-wip-us.apache.org/repos/asf/drill/blob/93780281/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
new file mode 100644
index 0000000..7155d43
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class StateTransitionException extends DrillException {
+  public StateTransitionException() {
+    super();
+  }
+
+  public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public StateTransitionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public StateTransitionException(String message) {
+    super(message);
+  }
+
+  public StateTransitionException(Throwable cause) {
+    super(cause);
+  }
+}