You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/26 18:51:11 UTC
[5/9] drill git commit: DRILL-2502: Improve code safety by providing
a generic event delivery mechan This fixes some problems with broken state
transitions that happen under queryManager.cancelExecutingFragments(),
which cause recursive entry into Forema
DRILL-2502: Improve code safety by providing a generic event delivery mechan
This fixes some problems with broken state transitions that happen under
queryManager.cancelExecutingFragments(), which cause recursive entry into
Foreman.moveToState().
EventProcessor
- created
Foreman
- altered to use the EventProcessor for moveToState()
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c7870df7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c7870df7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c7870df7
Branch: refs/heads/master
Commit: c7870df73eb270b0841a151024034b40b899be0e
Parents: bdeb344
Author: Chris Westin <cw...@yahoo.com>
Authored: Tue Mar 24 16:20:06 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Mar 26 09:58:35 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/EventProcessor.java | 104 +++++++++++
.../apache/drill/exec/work/foreman/Foreman.java | 171 +++++++++++--------
2 files changed, 207 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c7870df7/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
new file mode 100644
index 0000000..617801b
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common;
+
+import java.util.LinkedList;
+
+/**
+ * Process events serially.
+ *
+ * <p>Our use of listeners that deliver events directly can sometimes
+ * cause problems when events are delivered recursively in the middle of
+ * event handling by the same object. This helper class can be used to
+ * serialize events in such cases. If an event is being processed, arriving
+ * events are queued. Once the current event handling is completed, the
+ * next event on the queue is processed; this continues until the event
+ * queue is empty. The first thread to arrive will process its own event
+ * and all other events that arrive during that processing. Other threads
+ * will just enqueue their events.</p>
+ *
+ * @param <T> the event class
+ */
+public abstract class EventProcessor<T> {
+ private final LinkedList<T> queuedEvents = new LinkedList<>();
+ private volatile boolean isProcessing = false;
+
+ /**
+ * Constructor.
+ */
+ public EventProcessor() {
+ }
+
+ /**
+ * Send an event to the processor. If the processor is not busy, the event
+ * will be processed. If busy, the event will be queued to be processed after
+ * any prior events are processed.
+ *
+ * <p>If an event's processing causes an exception, it will be added to any
+ * previous exceptions as a suppressed exception. Once all the currently queued
+ * events have been processed, a single exception will be thrown.</p>
+ *
+ * @param newEvent the new event
+ */
+ public void sendEvent(final T newEvent) {
+ synchronized (queuedEvents) {
+ if (isProcessing) {
+ queuedEvents.addLast(newEvent);
+ return;
+ }
+
+ isProcessing = true;
+ }
+
+ @SuppressWarnings("resource")
+ final DeferredException deferredException = new DeferredException();
+ T event = newEvent;
+ while (true) {
+ try {
+ processEvent(event);
+ } catch (Exception e) {
+ deferredException.addException(e);
+ } catch (AssertionError ae) {
+ deferredException.addException(new RuntimeException("Caught an assertion", ae));
+ }
+
+ synchronized (queuedEvents) {
+ if (queuedEvents.isEmpty()) {
+ isProcessing = false;
+ break;
+ }
+
+ event = queuedEvents.removeFirst();
+ }
+ }
+
+ try {
+ deferredException.close();
+ } catch(Exception e) {
+ throw new RuntimeException("Exceptions caught during event processing", e);
+ }
+ }
+
+ /**
+ * Process a single event. Derived classes provide the implementation of this
+ * to process events.
+ *
+ * @param event the event to process
+ */
+ protected abstract void processEvent(T event);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c7870df7/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 32fd650..285b75a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
@@ -118,6 +119,7 @@ public class Foreman implements Runnable {
private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
+ private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
/**
@@ -644,87 +646,120 @@ public class Foreman implements Runnable {
}
}
- /**
- * Tells the foreman to move to a new state.
- *
- * @param newState the state to move to
- * @param exception if not null, the exception that drove this state transition (usually a failure)
- */
- private synchronized void moveToState(final QueryState newState, final Exception exception) {
- logger.info("State change requested. {} --> {}", state, newState, exception);
- switch(state) {
- case PENDING:
- if (newState == QueryState.RUNNING) {
- recordNewState(QueryState.RUNNING);
- return;
- }
+ private static class StateEvent {
+ final QueryState newState;
+ final Exception exception;
- //$FALL-THROUGH$
+ StateEvent(final QueryState newState, final Exception exception) {
+ this.newState = newState;
+ this.exception = exception;
+ }
+ }
- case RUNNING: {
- /*
- * For cases that cancel executing fragments, we have to record the new state first, because
- * the cancellation of the local root fragment will cause this to be called recursively.
- */
- switch(newState) {
- case CANCELLATION_REQUESTED: {
- assert exception == null;
- queryManager.markEndTime();
- recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
- foremanResult.setCompleted(QueryState.CANCELED);
+ private class StateSwitch extends EventProcessor<StateEvent> {
+ public void moveToState(final QueryState newState, final Exception exception) {
+ sendEvent(new StateEvent(newState, exception));
+ }
+
+ @Override
+ protected void processEvent(StateEvent event) {
+ final QueryState newState = event.newState;
+ final Exception exception = event.exception;
+
+ // TODO Auto-generated method stub
+ logger.info("State change requested. {} --> {}", state, newState,
+ exception);
+ switch (state) {
+ case PENDING:
+ if (newState == QueryState.RUNNING) {
+ recordNewState(QueryState.RUNNING);
+ return;
+ }
+
+ //$FALL-THROUGH$
+
+ case RUNNING: {
/*
- * We don't close the foremanResult until we've gotten acknowledgements, which
- * happens below in the case for current state == CANCELLATION_REQUESTED.
+ * For cases that cancel executing fragments, we have to record the new
+ * state first, because the cancellation of the local root fragment will
+ * cause this to be called recursively.
*/
- return;
- }
+ switch (newState) {
+ case CANCELLATION_REQUESTED: {
+ assert exception == null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten
+ * acknowledgements, which happens below in the case for current state
+ * == CANCELLATION_REQUESTED.
+ */
+ return;
+ }
- case COMPLETED: {
- assert exception == null;
- queryManager.markEndTime();
- recordNewState(QueryState.COMPLETED);
- foremanResult.setCompleted(QueryState.COMPLETED);
- foremanResult.close();
- return;
- }
+ case COMPLETED: {
+ assert exception == null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.COMPLETED);
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ return;
+ }
- case FAILED: {
- assert exception != null;
- queryManager.markEndTime();
- recordNewState(QueryState.FAILED);
- queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
- foremanResult.setFailed(exception);
- foremanResult.close();
- return;
- }
+ case FAILED: {
+ assert exception != null;
+ queryManager.markEndTime();
+ recordNewState(QueryState.FAILED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
- default:
- throw new IllegalStateException("illegal transition from RUNNING to " + newState);
+ default:
+ throw new IllegalStateException("illegal transition from RUNNING to "
+ + newState);
+ }
}
- }
- case CANCELLATION_REQUESTED:
- if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
- || (newState == QueryState.FAILED)) {
- /*
- * These amount to a completion of the cancellation requests' cleanup; now we
- * can clean up and send the result.
- */
- foremanResult.close();
+ case CANCELLATION_REQUESTED:
+ if ((newState == QueryState.CANCELED)
+ || (newState == QueryState.COMPLETED)
+ || (newState == QueryState.FAILED)) {
+ /*
+ * These amount to a completion of the cancellation requests' cleanup;
+ * now we can clean up and send the result.
+ */
+ foremanResult.close();
+ }
+ return;
+
+ case CANCELED:
+ case COMPLETED:
+ case FAILED:
+ logger
+ .warn(
+ "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+ newState, state);
+ return;
}
- return;
- case CANCELED:
- case COMPLETED:
- case FAILED:
- logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
- newState, state);
- return;
+ throw new IllegalStateException(String.format(
+ "Failure trying to change states: %s --> %s", state.name(),
+ newState.name()));
}
+ }
- throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
- state.name(), newState.name()));
+ /**
+ * Tells the foreman to move to a new state.
+ *
+ * @param newState the state to move to
+ * @param exception if not null, the exception that drove this state transition (usually a failure)
+ */
+ private void moveToState(final QueryState newState, final Exception exception) {
+ stateSwitch.moveToState(newState, exception);
}
private void recordNewState(final QueryState newState) {