You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/26 18:51:11 UTC

[5/9] drill git commit: DRILL-2502: Improve code safety by providing a generic event delivery mechan This fixes some problems with broken state transitions that happen under queryManager.cancelExecutingFragments(), which cause recursive entry into Forema

DRILL-2502: Improve code safety by providing a generic event delivery mechan
This fixes some problems with broken state transitions that happen under
queryManager.cancelExecutingFragments(), which cause recursive entry into
Foreman.moveToState().

EventProcessor
- created
Foreman
- altered to use the EventProcessor for moveToState()


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7870df7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7870df7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7870df7

Branch: refs/heads/master
Commit: c7870df73eb270b0841a151024034b40b899be0e
Parents: bdeb344
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:20:06 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Mar 26 09:58:35 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/EventProcessor.java | 104 +++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java | 171 +++++++++++--------
 2 files changed, 207 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c7870df7/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
new file mode 100644
index 0000000..617801b
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import java.util.LinkedList;
+
+/**
+ * Process events serially.
+ *
+ * <p>Our use of listeners that deliver events directly can sometimes
+ * cause problems when events are delivered recursively in the middle of
+ * event handling by the same object. This helper class can be used to
+ * serialize events in such cases. If an event is being processed, arriving
+ * events are queued. Once the current event handling is completed, the
+ * next event on the queue is processed; this continues until the event
+ * queue is empty. The first thread to arrive will process its own event
+ * and all other events that arrive during that processing. Other threads
+ * will just enqueue their events.</p>
+ *
+ * @param <T> the event class
+ */
+public abstract class EventProcessor<T> {
+  private final LinkedList<T> queuedEvents = new LinkedList<>();
+  private volatile boolean isProcessing = false;
+
+  /**
+   * Constructor.
+   */
+  public EventProcessor() {
+  }
+
+  /**
+   * Send an event to the processor. If the processor is not busy, the event
+   * will be processed. If busy, the event will be queued to be processed after
+   * any prior events are processed.
+   *
+   * <p>If an event's processing causes an exception, it will be added to any
+   * previous exceptions as a suppressed exception. Once all the currently queued
+   * events have been processed, a single exception will be thrown.</p>
+   *
+   * @param newEvent the new event
+   */
+  public void sendEvent(final T newEvent) {
+    synchronized (queuedEvents) {
+      if (isProcessing) {
+        queuedEvents.addLast(newEvent);
+        return;
+      }
+
+      isProcessing = true;
+    }
+
+    @SuppressWarnings("resource")
+    final DeferredException deferredException = new DeferredException();
+    T event = newEvent;
+    while (true) {
+      try {
+        processEvent(event);
+      } catch (Exception e) {
+        deferredException.addException(e);
+      } catch (AssertionError ae) {
+        deferredException.addException(new RuntimeException("Caught an assertion", ae));
+      }
+
+      synchronized (queuedEvents) {
+        if (queuedEvents.isEmpty()) {
+          isProcessing = false;
+          break;
+        }
+
+        event = queuedEvents.removeFirst();
+      }
+    }
+
+    try {
+      deferredException.close();
+    } catch(Exception e) {
+      throw new RuntimeException("Exceptions caught during event processing", e);
+    }
+  }
+
+  /**
+   * Process a single event. Derived classes provide the implementation of this
+   * to process events.
+   *
+   * @param event the event to process
+   */
+  protected abstract void processEvent(T event);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c7870df7/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 32fd650..285b75a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -118,6 +119,7 @@ public class Foreman implements Runnable {
   private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
   private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
+  private final StateSwitch stateSwitch = new StateSwitch();
   private final ForemanResult foremanResult = new ForemanResult();
 
   /**
@@ -644,87 +646,120 @@ public class Foreman implements Runnable {
     }
   }
 
-  /**
-   * Tells the foreman to move to a new state.
-   *
-   * @param newState the state to move to
-   * @param exception if not null, the exception that drove this state transition (usually a failure)
-   */
-  private synchronized void moveToState(final QueryState newState, final Exception exception) {
-    logger.info("State change requested.  {} --> {}", state, newState, exception);
-    switch(state) {
-    case PENDING:
-      if (newState == QueryState.RUNNING) {
-        recordNewState(QueryState.RUNNING);
-        return;
-      }
+  private static class StateEvent {
+    final QueryState newState;
+    final Exception exception;
 
-      //$FALL-THROUGH$
+    StateEvent(final QueryState newState, final Exception exception) {
+      this.newState = newState;
+      this.exception = exception;
+    }
+  }
 
-    case RUNNING: {
-      /*
-       * For cases that cancel executing fragments, we have to record the new state first, because
-       * the cancellation of the local root fragment will cause this to be called recursively.
-       */
-      switch(newState) {
-      case CANCELLATION_REQUESTED: {
-        assert exception == null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.CANCELLATION_REQUESTED);
-        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
-        foremanResult.setCompleted(QueryState.CANCELED);
+  private class StateSwitch extends EventProcessor<StateEvent> {
+    public void moveToState(final QueryState newState, final Exception exception) {
+      sendEvent(new StateEvent(newState, exception));
+    }
+
+    @Override
+    protected void processEvent(StateEvent event) {
+      final QueryState newState = event.newState;
+      final Exception exception = event.exception;
+
+      // TODO Auto-generated method stub
+      logger.info("State change requested.  {} --> {}", state, newState,
+          exception);
+      switch (state) {
+      case PENDING:
+        if (newState == QueryState.RUNNING) {
+          recordNewState(QueryState.RUNNING);
+          return;
+        }
+
+        //$FALL-THROUGH$
+
+      case RUNNING: {
         /*
-         * We don't close the foremanResult until we've gotten acknowledgements, which
-         * happens below in the case for current state == CANCELLATION_REQUESTED.
+         * For cases that cancel executing fragments, we have to record the new
+         * state first, because the cancellation of the local root fragment will
+         * cause this to be called recursively.
          */
-        return;
-      }
+        switch (newState) {
+        case CANCELLATION_REQUESTED: {
+          assert exception == null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.CANCELLATION_REQUESTED);
+          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          foremanResult.setCompleted(QueryState.CANCELED);
+          /*
+           * We don't close the foremanResult until we've gotten
+           * acknowledgements, which happens below in the case for current state
+           * == CANCELLATION_REQUESTED.
+           */
+          return;
+        }
 
-      case COMPLETED: {
-        assert exception == null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.COMPLETED);
-        foremanResult.setCompleted(QueryState.COMPLETED);
-        foremanResult.close();
-        return;
-      }
+        case COMPLETED: {
+          assert exception == null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.COMPLETED);
+          foremanResult.setCompleted(QueryState.COMPLETED);
+          foremanResult.close();
+          return;
+        }
 
-      case FAILED: {
-        assert exception != null;
-        queryManager.markEndTime();
-        recordNewState(QueryState.FAILED);
-        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      }
+        case FAILED: {
+          assert exception != null;
+          queryManager.markEndTime();
+          recordNewState(QueryState.FAILED);
+          queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+          foremanResult.setFailed(exception);
+          foremanResult.close();
+          return;
+        }
 
-      default:
-        throw new IllegalStateException("illegal transition from RUNNING to " + newState);
+        default:
+          throw new IllegalStateException("illegal transition from RUNNING to "
+              + newState);
+        }
       }
-    }
 
-    case CANCELLATION_REQUESTED:
-      if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
-          || (newState == QueryState.FAILED)) {
-        /*
-         * These amount to a completion of the cancellation requests' cleanup; now we
-         * can clean up and send the result.
-         */
-        foremanResult.close();
+      case CANCELLATION_REQUESTED:
+        if ((newState == QueryState.CANCELED)
+            || (newState == QueryState.COMPLETED)
+            || (newState == QueryState.FAILED)) {
+          /*
+           * These amount to a completion of the cancellation requests' cleanup;
+           * now we can clean up and send the result.
+           */
+          foremanResult.close();
+        }
+        return;
+
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        logger
+            .warn(
+                "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+                newState, state);
+        return;
       }
-      return;
 
-    case CANCELED:
-    case COMPLETED:
-    case FAILED:
-      logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
-          newState, state);
-      return;
+      throw new IllegalStateException(String.format(
+          "Failure trying to change states: %s --> %s", state.name(),
+          newState.name()));
     }
+  }
 
-    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
-        state.name(), newState.name()));
+  /**
+   * Tells the foreman to move to a new state.
+   *
+   * @param newState the state to move to
+   * @param exception if not null, the exception that drove this state transition (usually a failure)
+   */
+  private void moveToState(final QueryState newState, final Exception exception) {
+    stateSwitch.moveToState(newState, exception);
   }
 
   private void recordNewState(final QueryState newState) {