You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/05/19 01:50:06 UTC
drill git commit: DRILL-4676: Foreman no longer uses a CountDownLatch
and relies on the EventProcessor instead
Repository: drill
Updated Branches:
refs/heads/master b075bf610 -> 1a8430eac
DRILL-4676: Foreman no longer uses a CountDownLatch and relies on the EventProcessor instead
as part of this change Foreman.ResponseSendListener no longer calls Foreman.moveToState() as it doesn't make any difference
at this point.
this closes #503
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1a8430ea
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1a8430ea
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1a8430ea
Branch: refs/heads/master
Commit: 1a8430eac99ccaae2ef67db6904efdc7478c9f6c
Parents: b075bf6
Author: adeneche <ad...@gmail.com>
Authored: Mon May 16 13:19:18 2016 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Wed May 18 10:23:09 2016 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/EventProcessor.java | 81 ++++---
.../apache/drill/exec/work/foreman/Foreman.java | 240 +++++++++----------
.../drill/exec/work/foreman/QueryManager.java | 12 +-
3 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/common/src/main/java/org/apache/drill/common/EventProcessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/EventProcessor.java b/common/src/main/java/org/apache/drill/common/EventProcessor.java
index 617801b..08ec55e 100644
--- a/common/src/main/java/org/apache/drill/common/EventProcessor.java
+++ b/common/src/main/java/org/apache/drill/common/EventProcessor.java
@@ -20,62 +20,79 @@ package org.apache.drill.common;
import java.util.LinkedList;
/**
- * Process events serially.
- *
- * <p>Our use of listeners that deliver events directly can sometimes
+ * Process events serially.<br>
+ * <br>
+ * Our use of listeners that deliver events directly can sometimes
* cause problems when events are delivered recursively in the middle of
* event handling by the same object. This helper class can be used to
- * serialize events in such cases. If an event is being processed, arriving
- * events are queued. Once the current event handling is completed, the
- * next event on the queue is processed; this continues until the event
- * queue is empty. The first thread to arrive will process its own event
- * and all other events that arrive during that processing. Other threads
- * will just enqueue their events.</p>
+ * serialize events in such cases.<br>
+ * <br>
+ * All events are queued until {@link #start()} is called.
+ * The thread that calls {@link #start()} will process all events in the order they
+ * were added until the queue is empty. Other threads will just enqueue their events.<br>
+ * When the queue is empty, the first thread that adds an event will start processing
+ * the queue until it's empty again.
*
* @param <T> the event class
*/
public abstract class EventProcessor<T> {
private final LinkedList<T> queuedEvents = new LinkedList<>();
private volatile boolean isProcessing = false;
+ private volatile boolean started = false;
/**
- * Constructor.
- */
- public EventProcessor() {
- }
-
- /**
- * Send an event to the processor. If the processor is not busy, the event
- * will be processed. If busy, the event will be queued to be processed after
- * any prior events are processed.
+ * Send an event to the processor. the event will be queued to be processed after
+ * any prior events are processed, once processing actually starts.
*
* <p>If an event's processing causes an exception, it will be added to any
* previous exceptions as a suppressed exception. Once all the currently queued
* events have been processed, a single exception will be thrown.</p>
*
* @param newEvent the new event
+ *
+ * @throws RuntimeException if any exception is thrown while events are being processed
*/
public void sendEvent(final T newEvent) {
synchronized (queuedEvents) {
- if (isProcessing) {
- queuedEvents.addLast(newEvent);
+ queuedEvents.addLast(newEvent);
+ if (!started || isProcessing) {
return;
}
isProcessing = true;
}
+ processEvents();
+ }
+
+ /**
+ * Start processing events as soon as the queue isn't empty.<br>
+ * If the queue is not empty, this method will process all events already
+ * in the queue and any event that will be added while the queue is being processed.
+ *
+ * @throws RuntimeException if any exception is thrown while events are being processed
+ */
+ public void start() {
+ synchronized (queuedEvents) {
+ if (started) {
+ return;
+ }
+
+ started = true;
+ isProcessing = true;
+ }
+
+ processEvents();
+ }
+
+ /**
+ * Process all events in the queue until it's empty.
+ */
+ private void processEvents() {
@SuppressWarnings("resource")
final DeferredException deferredException = new DeferredException();
- T event = newEvent;
while (true) {
- try {
- processEvent(event);
- } catch (Exception e) {
- deferredException.addException(e);
- } catch (AssertionError ae) {
- deferredException.addException(new RuntimeException("Caught an assertion", ae));
- }
+ T event;
synchronized (queuedEvents) {
if (queuedEvents.isEmpty()) {
@@ -85,6 +102,14 @@ public abstract class EventProcessor<T> {
event = queuedEvents.removeFirst();
}
+
+ try {
+ processEvent(event);
+ } catch (Exception e) {
+ deferredException.addException(e);
+ } catch (AssertionError ae) {
+ deferredException.addException(new RuntimeException("Caught an assertion", ae));
+ }
}
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e7defec..2829ac1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -134,8 +134,6 @@ public class Foreman implements Runnable {
private volatile DistributedLease lease; // used to limit the number of concurrent queries
- private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
- private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
@@ -169,7 +167,7 @@ public class Foreman implements Runnable {
queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
- drillbitContext.getClusterCoordinator(), stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
+ drillbitContext.getClusterCoordinator(), this);
final OptionManager optionManager = queryContext.getOptions();
queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
@@ -210,7 +208,7 @@ public class Foreman implements Runnable {
*/
public void cancel() {
// Note this can be called from outside of run() on another thread, or after run() completes
- stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null);
+ addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
}
/**
@@ -305,7 +303,11 @@ public class Foreman implements Runnable {
* would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
* to accept events.
*/
- acceptExternalEvents.countDown();
+ try {
+ stateSwitch.start();
+ } catch (Exception ex) {
+ moveToState(QueryState.FAILED, ex);
+ }
// If we received the resume signal before fragments are setup, the first call does not actually resume the
// fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
@@ -838,126 +840,128 @@ public class Foreman implements Runnable {
}
}
- private class StateSwitch extends EventProcessor<StateEvent> {
- public void moveToState(final QueryState newState, final Exception exception) {
- sendEvent(new StateEvent(newState, exception));
- }
-
- @Override
- protected void processEvent(final StateEvent event) {
- final QueryState newState = event.newState;
- final Exception exception = event.exception;
-
- // TODO Auto-generated method stub
- logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
- exception);
- switch (state) {
- case ENQUEUED:
- switch (newState) {
- case FAILED:
- Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
- recordNewState(newState);
- foremanResult.setFailed(exception);
- foremanResult.close();
- return;
- case STARTING:
- recordNewState(newState);
- return;
- }
- break;
+ private void moveToState(final QueryState newState, final Exception exception) {
+ logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
+ exception);
+ switch (state) {
+ case ENQUEUED:
+ switch (newState) {
+ case FAILED:
+ Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
+ recordNewState(newState);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
case STARTING:
- if (newState == QueryState.RUNNING) {
- recordNewState(QueryState.RUNNING);
- return;
- }
+ recordNewState(newState);
+ return;
+ }
+ break;
+ case STARTING:
+ if (newState == QueryState.RUNNING) {
+ recordNewState(QueryState.RUNNING);
+ return;
+ }
- //$FALL-THROUGH$
+ //$FALL-THROUGH$
- case RUNNING: {
+ case RUNNING: {
+ /*
+ * For cases that cancel executing fragments, we have to record the new
+ * state first, because the cancellation of the local root fragment will
+ * cause this to be called recursively.
+ */
+ switch (newState) {
+ case CANCELLATION_REQUESTED: {
+ assert exception == null;
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext);
+ foremanResult.setCompleted(QueryState.CANCELED);
/*
- * For cases that cancel executing fragments, we have to record the new
- * state first, because the cancellation of the local root fragment will
- * cause this to be called recursively.
+ * We don't close the foremanResult until we've gotten
+ * acknowledgements, which happens below in the case for current state
+ * == CANCELLATION_REQUESTED.
*/
- switch (newState) {
- case CANCELLATION_REQUESTED: {
- assert exception == null;
- recordNewState(QueryState.CANCELLATION_REQUESTED);
- queryManager.cancelExecutingFragments(drillbitContext);
- foremanResult.setCompleted(QueryState.CANCELED);
- /*
- * We don't close the foremanResult until we've gotten
- * acknowledgements, which happens below in the case for current state
- * == CANCELLATION_REQUESTED.
- */
- return;
- }
+ return;
+ }
- case COMPLETED: {
- assert exception == null;
- recordNewState(QueryState.COMPLETED);
- foremanResult.setCompleted(QueryState.COMPLETED);
- foremanResult.close();
- return;
- }
+ case COMPLETED: {
+ assert exception == null;
+ recordNewState(QueryState.COMPLETED);
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ return;
+ }
- case FAILED: {
- assert exception != null;
- recordNewState(QueryState.FAILED);
- queryManager.cancelExecutingFragments(drillbitContext);
- foremanResult.setFailed(exception);
- foremanResult.close();
- return;
- }
+ case FAILED: {
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ queryManager.cancelExecutingFragments(drillbitContext);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
- }
- break;
}
+ break;
+ }
- case CANCELLATION_REQUESTED:
- if ((newState == QueryState.CANCELED)
- || (newState == QueryState.COMPLETED)
- || (newState == QueryState.FAILED)) {
-
- if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
- if (newState == QueryState.FAILED) {
- assert exception != null;
- recordNewState(QueryState.FAILED);
- foremanResult.setForceFailure(exception);
- }
+ case CANCELLATION_REQUESTED:
+ if ((newState == QueryState.CANCELED)
+ || (newState == QueryState.COMPLETED)
+ || (newState == QueryState.FAILED)) {
+
+ if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+ if (newState == QueryState.FAILED) {
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ foremanResult.setForceFailure(exception);
}
- /*
- * These amount to a completion of the cancellation requests' cleanup;
- * now we can clean up and send the result.
- */
- foremanResult.close();
}
- return;
-
- case CANCELED:
- case COMPLETED:
- case FAILED:
- logger
- .warn(
- "Dropping request to move to {} state as query is already at {} state (which is terminal).",
- newState, state);
- return;
+ /*
+ * These amount to a completion of the cancellation requests' cleanup;
+ * now we can clean up and send the result.
+ */
+ foremanResult.close();
}
+ return;
+
+ case CANCELED:
+ case COMPLETED:
+ case FAILED:
+ logger
+ .warn(
+ "Dropping request to move to {} state as query is already at {} state (which is terminal).",
+ newState, state);
+ return;
+ }
+
+ throw new IllegalStateException(String.format(
+ "Failure trying to change states: %s --> %s", state.name(),
+ newState.name()));
+ }
+
+ private class StateSwitch extends EventProcessor<StateEvent> {
+ public void addEvent(final QueryState newState, final Exception exception) {
+ sendEvent(new StateEvent(newState, exception));
+ }
- throw new IllegalStateException(String.format(
- "Failure trying to change states: %s --> %s", state.name(),
- newState.name()));
+ @Override
+ protected void processEvent(final StateEvent event) {
+ moveToState(event.newState, event.exception);
}
}
/**
- * Tells the foreman to move to a new state.
+ * Tells the foreman to move to a new state.<br>
+ * This will be added to the end of the event queue and will be processed once the foreman is ready
+ * to accept external events.
*
* @param newState the state to move to
* @param exception if not null, the exception that drove this state transition (usually a failure)
*/
- private void moveToState(final QueryState newState, final Exception exception) {
- stateSwitch.moveToState(newState, exception);
+ public void addToEventQueue(final QueryState newState, final Exception exception) {
+ stateSwitch.addEvent(newState, exception);
}
private void recordNewState(final QueryState newState) {
@@ -1196,13 +1200,13 @@ public class Foreman implements Runnable {
@Override
public void failed(final RpcException ex) {
- if (latch != null) {
+ if (latch != null) { // this block only applies to intermediate fragments
fragmentSubmitFailures.addFailure(endpoint, ex);
latch.countDown();
- } else {
+ } else { // this block only applies to leaf fragments
// since this won't be waited on, we can wait to deliver this event once the Foreman is ready
logger.debug("Failure while sending fragment. Stopping query.", ex);
- stateListener.moveToState(QueryState.FAILED, ex);
+ addToEventQueue(QueryState.FAILED, ex);
}
}
@@ -1217,28 +1221,6 @@ public class Foreman implements Runnable {
}
/**
- * Provides gated access to state transitions.
- *
- * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The
- * latch will be tripped when the Foreman is sufficiently set up that it can receive and process
- * external events from other threads.
- */
- public class StateListener {
- /**
- * Move the Foreman to the specified new state.
- *
- * @param newState the state to move to
- * @param ex if moving to a failure state, the exception that led to the failure; used for reporting
- * to the user
- */
- public void moveToState(final QueryState newState, final Exception ex) {
- acceptExternalEvents.awaitUninterruptibly();
-
- Foreman.this.moveToState(newState, ex);
- }
- }
-
- /**
* Listens for the status of the RPC response sent to the user for the query.
*/
private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@@ -1246,13 +1228,11 @@ public class Foreman implements Runnable {
public void failed(final RpcException ex) {
logger.info("Failure while trying communicate query result to initiating client. " +
"This would happen if a client is disconnected before response notice can be sent.", ex);
- stateListener.moveToState(QueryState.FAILED, ex);
}
@Override
public void interrupted(final InterruptedException e) {
logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
- stateListener.moveToState(QueryState.FAILED, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1a8430ea/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index f4ca42b..b76fd7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
@@ -53,7 +52,6 @@ import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
-import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.predicates.IntObjectPredicate;
@@ -79,7 +77,6 @@ public class QueryManager implements AutoCloseable {
.build();
private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
- private final StateListener stateListener;
private final QueryId queryId;
private final String stringQueryId;
private final RunQuery runQuery;
@@ -108,10 +105,9 @@ public class QueryManager implements AutoCloseable {
private final AtomicInteger finishedFragments = new AtomicInteger(0);
public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
- final ClusterCoordinator coordinator, final StateListener stateListener, final Foreman foreman) {
+ final ClusterCoordinator coordinator, final Foreman foreman) {
this.queryId = queryId;
this.runQuery = runQuery;
- this.stateListener = stateListener;
this.foreman = foreman;
stringQueryId = QueryIdHelper.getQueryId(queryId);
@@ -471,7 +467,7 @@ public class QueryManager implements AutoCloseable {
final int remaining = totalNodes - finishedNodes;
if (remaining == 0) {
// this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
- stateListener.moveToState(QueryState.COMPLETED, null);
+ foreman.addToEventQueue(QueryState.COMPLETED, null);
} else {
logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
this.fragmentDataSet.size() - finishedFragments.get());
@@ -494,7 +490,7 @@ public class QueryManager implements AutoCloseable {
break;
case FAILED:
- stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
+ foreman.addToEventQueue(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
// fall-through.
case FINISHED:
case CANCELLED:
@@ -548,7 +544,7 @@ public class QueryManager implements AutoCloseable {
if (atLeastOneFailure) {
logger.warn("Drillbits [{}] no longer registered in cluster. Canceling query {}",
failedNodeList, QueryIdHelper.getQueryId(queryId));
- stateListener.moveToState(QueryState.FAILED,
+ foreman.addToEventQueue(QueryState.FAILED,
new ForemanException(String.format("One more more nodes lost connectivity during query. Identified nodes were [%s].",
failedNodeList)));
}