You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/05/19 01:50:06 UTC

drill git commit: DRILL-4676: Foreman no longer uses a CountDownLatch and relies on the EventProcessor instead

Repository: drill
Updated Branches:
  refs/heads/master b075bf610 -> 1a8430eac


DRILL-4676: Foreman no longer uses a CountDownLatch and relies on the EventProcessor instead

as part of this change Foreman.ResponseSendListener no longer calls Foreman.moveToState() as it doesn't make any difference
at this point.

this closes #503


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1a8430ea
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1a8430ea
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1a8430ea

Branch: refs/heads/master
Commit: 1a8430eac99ccaae2ef67db6904efdc7478c9f6c
Parents: b075bf6
Author: adeneche <ad...@gmail.com>
Authored: Mon May 16 13:19:18 2016 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Wed May 18 10:23:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/EventProcessor.java |  81 ++++---
 .../apache/drill/exec/work/foreman/Foreman.java | 240 +++++++++----------
 .../drill/exec/work/foreman/QueryManager.java   |  12 +-
 3 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
index 617801b..08ec55e 100644
--- a/common/src/main/java/org/apache/drill/common/EventProcessor.java
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -20,62 +20,79 @@ package org.apache.drill.common;
 import java.util.LinkedList;
 
 /**
- * Process events serially.
- *
- * <p>Our use of listeners that deliver events directly can sometimes
+ * Process events serially.<br>
+ * <br>
+ * Our use of listeners that deliver events directly can sometimes
  * cause problems when events are delivered recursively in the middle of
  * event handling by the same object. This helper class can be used to
- * serialize events in such cases. If an event is being processed, arriving
- * events are queued. Once the current event handling is completed, the
- * next event on the queue is processed; this continues until the event
- * queue is empty. The first thread to arrive will process its own event
- * and all other events that arrive during that processing. Other threads
- * will just enqueue their events.</p>
+ * serialize events in such cases.<br>
+ * <br>
+ * All events are queued until {@link #start()} is called.
+ * The thread that calls {@link #start()} will process all events in the order they
+ * were added until the queue is empty. Other threads will just enqueue their events.<br>
+ * When the queue is empty, the first thread that adds an event will start processing
+ * the queue until it's empty again.
  *
  * @param <T> the event class
  */
 public abstract class EventProcessor<T> {
   private final LinkedList<T> queuedEvents = new LinkedList<>();
   private volatile boolean isProcessing = false;
+  private volatile boolean started = false;
 
   /**
-   * Constructor.
-   */
-  public EventProcessor() {
-  }
-
-  /**
-   * Send an event to the processor. If the processor is not busy, the event
-   * will be processed. If busy, the event will be queued to be processed after
-   * any prior events are processed.
+   * Send an event to the processor. the event will be queued to be processed after
+   * any prior events are processed, once processing actually starts.
    *
    * <p>If an event's processing causes an exception, it will be added to any
    * previous exceptions as a suppressed exception. Once all the currently queued
    * events have been processed, a single exception will be thrown.</p>
    *
    * @param newEvent the new event
+   *
+   * @throws RuntimeException if any exception is thrown while events are being processed
    */
   public void sendEvent(final T newEvent) {
     synchronized (queuedEvents) {
-      if (isProcessing) {
-        queuedEvents.addLast(newEvent);
+      queuedEvents.addLast(newEvent);
+      if (!started || isProcessing) {
         return;
       }
 
       isProcessing = true;
     }
 
+    processEvents();
+  }
+
+  /**
+   * Start processing events as soon as the queue isn't empty.<br>
+   * If the queue is not empty, this method will process all events already
+   * in the queue and any event that will be added while the queue is being processed.
+   *
+   * @throws RuntimeException if any exception is thrown while events are being processed
+   */
+  public void start() {
+    synchronized (queuedEvents) {
+      if (started) {
+        return;
+      }
+
+      started = true;
+      isProcessing = true;
+    }
+
+    processEvents();
+  }
+
+  /**
+   * Process all events in the queue until it's empty.
+   */
+  private void processEvents() {
     @SuppressWarnings("resource")
     final DeferredException deferredException = new DeferredException();
-    T event = newEvent;
     while (true) {
-      try {
-        processEvent(event);
-      } catch (Exception e) {
-        deferredException.addException(e);
-      } catch (AssertionError ae) {
-        deferredException.addException(new RuntimeException("Caught an assertion", ae));
-      }
+      T event;
 
       synchronized (queuedEvents) {
         if (queuedEvents.isEmpty()) {
@@ -85,6 +102,14 @@ public abstract class EventProcessor<T> {
 
         event = queuedEvents.removeFirst();
       }
+
+      try {
+        processEvent(event);
+      } catch (Exception e) {
+        deferredException.addException(e);
+      } catch (AssertionError ae) {
+        deferredException.addException(new RuntimeException("Caught an assertion", ae));
+      }
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e7defec..2829ac1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -134,8 +134,6 @@ public class Foreman implements Runnable {
 
   private volatile DistributedLease lease; // used to limit the number of concurrent queries
 
-  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
-  private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
@@ -169,7 +167,7 @@ public class Foreman implements Runnable {
 
     queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
     queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
-        drillbitContext.getClusterCoordinator(), stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
+        drillbitContext.getClusterCoordinator(), this);
 
     final OptionManager optionManager = queryContext.getOptions();
     queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
@@ -210,7 +208,7 @@ public class Foreman implements Runnable {
    */
   public void cancel() {
     // Note this can be called from outside of run() on another thread, or after run() completes
-    stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null);
+    addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
   }
 
   /**
@@ -305,7 +303,11 @@ public class Foreman implements Runnable {
        * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
        * to accept events.
        */
-      acceptExternalEvents.countDown();
+      try {
+        stateSwitch.start();
+      } catch (Exception ex) {
+        moveToState(QueryState.FAILED, ex);
+      }
 
       // If we received the resume signal before fragments are setup, the first call does not actually resume the
       // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
@@ -838,126 +840,128 @@ public class Foreman implements Runnable {
     }
   }
 
-  private class StateSwitch extends EventProcessor<StateEvent> {
-    public void moveToState(final QueryState newState, final Exception exception) {
-      sendEvent(new StateEvent(newState, exception));
-    }
-
-    @Override
-    protected void processEvent(final StateEvent event) {
-      final QueryState newState = event.newState;
-      final Exception exception = event.exception;
-
-      // TODO Auto-generated method stub
-      logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
-          exception);
-      switch (state) {
-      case ENQUEUED:
-        switch (newState) {
-          case FAILED:
-            Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
-            recordNewState(newState);
-            foremanResult.setFailed(exception);
-            foremanResult.close();
-            return;
-          case STARTING:
-            recordNewState(newState);
-            return;
-        }
-        break;
+  private void moveToState(final QueryState newState, final Exception exception) {
+    logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
+      exception);
+    switch (state) {
+    case ENQUEUED:
+      switch (newState) {
+      case FAILED:
+        Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
+        recordNewState(newState);
+        foremanResult.setFailed(exception);
+        foremanResult.close();
+        return;
       case STARTING:
-        if (newState == QueryState.RUNNING) {
-          recordNewState(QueryState.RUNNING);
-          return;
-        }
+        recordNewState(newState);
+        return;
+      }
+      break;
+    case STARTING:
+      if (newState == QueryState.RUNNING) {
+        recordNewState(QueryState.RUNNING);
+        return;
+      }
 
-        //$FALL-THROUGH$
+      //$FALL-THROUGH$
 
-      case RUNNING: {
+    case RUNNING: {
+      /*
+       * For cases that cancel executing fragments, we have to record the new
+       * state first, because the cancellation of the local root fragment will
+       * cause this to be called recursively.
+       */
+      switch (newState) {
+      case CANCELLATION_REQUESTED: {
+        assert exception == null;
+        recordNewState(QueryState.CANCELLATION_REQUESTED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setCompleted(QueryState.CANCELED);
         /*
-         * For cases that cancel executing fragments, we have to record the new
-         * state first, because the cancellation of the local root fragment will
-         * cause this to be called recursively.
+         * We don't close the foremanResult until we've gotten
+         * acknowledgements, which happens below in the case for current state
+         * == CANCELLATION_REQUESTED.
          */
-        switch (newState) {
-        case CANCELLATION_REQUESTED: {
-          assert exception == null;
-          recordNewState(QueryState.CANCELLATION_REQUESTED);
-          queryManager.cancelExecutingFragments(drillbitContext);
-          foremanResult.setCompleted(QueryState.CANCELED);
-          /*
-           * We don't close the foremanResult until we've gotten
-           * acknowledgements, which happens below in the case for current state
-           * == CANCELLATION_REQUESTED.
-           */
-          return;
-        }
+        return;
+      }
 
-        case COMPLETED: {
-          assert exception == null;
-          recordNewState(QueryState.COMPLETED);
-          foremanResult.setCompleted(QueryState.COMPLETED);
-          foremanResult.close();
-          return;
-        }
+      case COMPLETED: {
+        assert exception == null;
+        recordNewState(QueryState.COMPLETED);
+        foremanResult.setCompleted(QueryState.COMPLETED);
+        foremanResult.close();
+        return;
+      }
 
-        case FAILED: {
-          assert exception != null;
-          recordNewState(QueryState.FAILED);
-          queryManager.cancelExecutingFragments(drillbitContext);
-          foremanResult.setFailed(exception);
-          foremanResult.close();
-          return;
-        }
+      case FAILED: {
+        assert exception != null;
+        recordNewState(QueryState.FAILED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setFailed(exception);
+        foremanResult.close();
+        return;
+      }
 
-        }
-        break;
       }
+      break;
+    }
 
-      case CANCELLATION_REQUESTED:
-        if ((newState == QueryState.CANCELED)
-            || (newState == QueryState.COMPLETED)
-            || (newState == QueryState.FAILED)) {
-
-          if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
-            if (newState == QueryState.FAILED) {
-              assert exception != null;
-              recordNewState(QueryState.FAILED);
-              foremanResult.setForceFailure(exception);
-            }
+    case CANCELLATION_REQUESTED:
+      if ((newState == QueryState.CANCELED)
+        || (newState == QueryState.COMPLETED)
+        || (newState == QueryState.FAILED)) {
+
+        if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+          if (newState == QueryState.FAILED) {
+            assert exception != null;
+            recordNewState(QueryState.FAILED);
+            foremanResult.setForceFailure(exception);
           }
-          /*
-           * These amount to a completion of the cancellation requests' cleanup;
-           * now we can clean up and send the result.
-           */
-          foremanResult.close();
         }
-        return;
-
-      case CANCELED:
-      case COMPLETED:
-      case FAILED:
-        logger
-            .warn(
-                "Dropping request to move to {} state as query is already at {} state (which is terminal).",
-                newState, state);
-        return;
+        /*
+         * These amount to a completion of the cancellation requests' cleanup;
+         * now we can clean up and send the result.
+         */
+        foremanResult.close();
       }
+      return;
+
+    case CANCELED:
+    case COMPLETED:
+    case FAILED:
+      logger
+        .warn(
+          "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+          newState, state);
+      return;
+    }
+
+    throw new IllegalStateException(String.format(
+      "Failure trying to change states: %s --> %s", state.name(),
+      newState.name()));
+  }
+
+  private class StateSwitch extends EventProcessor<StateEvent> {
+    public void addEvent(final QueryState newState, final Exception exception) {
+      sendEvent(new StateEvent(newState, exception));
+    }
 
-      throw new IllegalStateException(String.format(
-          "Failure trying to change states: %s --> %s", state.name(),
-          newState.name()));
+    @Override
+    protected void processEvent(final StateEvent event) {
+      moveToState(event.newState, event.exception);
     }
   }
 
   /**
-   * Tells the foreman to move to a new state.
+   * Tells the foreman to move to a new state.<br>
+   * This will be added to the end of the event queue and will be processed once the foreman is ready
+   * to accept external events.
    *
    * @param newState the state to move to
    * @param exception if not null, the exception that drove this state transition (usually a failure)
    */
-  private void moveToState(final QueryState newState, final Exception exception) {
-    stateSwitch.moveToState(newState, exception);
+  public void addToEventQueue(final QueryState newState, final Exception exception) {
+    stateSwitch.addEvent(newState, exception);
   }
 
   private void recordNewState(final QueryState newState) {
@@ -1196,13 +1200,13 @@ public class Foreman implements Runnable {
 
     @Override
     public void failed(final RpcException ex) {
-      if (latch != null) {
+      if (latch != null) { // this block only applies to intermediate fragments
         fragmentSubmitFailures.addFailure(endpoint, ex);
         latch.countDown();
-      } else {
+      } else { // this block only applies to leaf fragments
         // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
         logger.debug("Failure while sending fragment.  Stopping query.", ex);
-        stateListener.moveToState(QueryState.FAILED, ex);
+        addToEventQueue(QueryState.FAILED, ex);
       }
     }
 
@@ -1217,28 +1221,6 @@ public class Foreman implements Runnable {
   }
 
   /**
-   * Provides gated access to state transitions.
-   *
-   * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The
-   * latch will be tripped when the Foreman is sufficiently set up that it can receive and process
-   * external events from other threads.
-   */
-  public class StateListener {
-    /**
-     * Move the Foreman to the specified new state.
-     *
-     * @param newState the state to move to
-     * @param ex if moving to a failure state, the exception that led to the failure; used for reporting
-     *   to the user
-     */
-    public void moveToState(final QueryState newState, final Exception ex) {
-      acceptExternalEvents.awaitUninterruptibly();
-
-      Foreman.this.moveToState(newState, ex);
-    }
-  }
-
-  /**
    * Listens for the status of the RPC response sent to the user for the query.
    */
   private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@@ -1246,13 +1228,11 @@ public class Foreman implements Runnable {
     public void failed(final RpcException ex) {
       logger.info("Failure while trying communicate query result to initiating client. " +
               "This would happen if a client is disconnected before response notice can be sent.", ex);
-      stateListener.moveToState(QueryState.FAILED, ex);
     }
 
     @Override
     public void interrupted(final InterruptedException e) {
       logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
-      stateListener.moveToState(QueryState.FAILED, e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index f4ca42b..b76fd7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
@@ -53,7 +52,6 @@ import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
-import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 
 import com.carrotsearch.hppc.IntObjectHashMap;
 import com.carrotsearch.hppc.predicates.IntObjectPredicate;
@@ -79,7 +77,6 @@ public class QueryManager implements AutoCloseable {
       .build();
 
   private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
-  private final StateListener stateListener;
   private final QueryId queryId;
   private final String stringQueryId;
   private final RunQuery runQuery;
@@ -108,10 +105,9 @@ public class QueryManager implements AutoCloseable {
   private final AtomicInteger finishedFragments = new AtomicInteger(0);
 
   public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
-      final ClusterCoordinator coordinator, final StateListener stateListener, final Foreman foreman) {
+      final ClusterCoordinator coordinator, final Foreman foreman) {
     this.queryId =  queryId;
     this.runQuery = runQuery;
-    this.stateListener = stateListener;
     this.foreman = foreman;
 
     stringQueryId = QueryIdHelper.getQueryId(queryId);
@@ -471,7 +467,7 @@ public class QueryManager implements AutoCloseable {
     final int remaining = totalNodes - finishedNodes;
     if (remaining == 0) {
       // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
-      stateListener.moveToState(QueryState.COMPLETED, null);
+      foreman.addToEventQueue(QueryState.COMPLETED, null);
     } else {
       logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
           this.fragmentDataSet.size() - finishedFragments.get());
@@ -494,7 +490,7 @@ public class QueryManager implements AutoCloseable {
         break;
 
       case FAILED:
-        stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
+        foreman.addToEventQueue(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
         // fall-through.
       case FINISHED:
       case CANCELLED:
@@ -548,7 +544,7 @@ public class QueryManager implements AutoCloseable {
       if (atLeastOneFailure) {
         logger.warn("Drillbits [{}] no longer registered in cluster.  Canceling query {}",
             failedNodeList, QueryIdHelper.getQueryId(queryId));
-        stateListener.moveToState(QueryState.FAILED,
+        foreman.addToEventQueue(QueryState.FAILED,
             new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].",
                 failedNodeList)));
       }