You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/19 22:08:17 UTC
[7/9] drill git commit: DRILL-2245: Clean up query setup and
execution kickoff in Foreman/WorkManager in order to ensure consistent
handling, and avoid hangs and races,
with the goal of improving Drillbit robustness.
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3228da9..3a7123d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.work.batch;
import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
-import java.io.IOException;
-
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -33,7 +31,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcConstants;
@@ -42,123 +39,129 @@ import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
public class ControlHandlerImpl implements ControlMessageHandler {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
private final WorkerBee bee;
- public ControlHandlerImpl(WorkerBee bee) {
- super();
+ public ControlHandlerImpl(final WorkerBee bee) {
this.bee = bee;
}
@Override
- public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ public Response handle(final ControlConnection connection, final int rpcType,
+ final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received bit com message of type {}", rpcType);
}
switch (rpcType) {
- case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
- FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
cancelFragment(handle);
return DataRpcConfig.OK;
+ }
- case RpcType.REQ_RECEIVER_FINISHED_VALUE:
- FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+ case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+ final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
receivingFragmentFinished(finishedReceiver);
return DataRpcConfig.OK;
+ }
case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER));
+ bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
// TODO: Support a type of message that has no response.
return DataRpcConfig.OK;
- case RpcType.REQ_QUERY_CANCEL_VALUE:
- QueryId id = get(pBody, QueryId.PARSER);
- Foreman f = bee.getForemanForQueryId(id);
- if(f != null){
- f.cancel();
+ case RpcType.REQ_QUERY_CANCEL_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.cancel();
return DataRpcConfig.OK;
- }else{
+ } else {
return DataRpcConfig.FAIL;
}
+ }
- case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE:
- InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
- for(int i =0; i < fragments.getFragmentCount(); i++){
+ case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
+ final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+ for(int i = 0; i < fragments.getFragmentCount(); i++) {
startNewRemoteFragment(fragments.getFragment(i));
}
return DataRpcConfig.OK;
+ }
- case RpcType.REQ_QUERY_STATUS_VALUE:
- QueryId queryId = get(pBody, QueryId.PARSER);
- Foreman foreman = bee.getForemanForQueryId(queryId);
- QueryProfile profile;
+ case RpcType.REQ_QUERY_STATUS_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
if (foreman == null) {
throw new RpcException("Query not running on node.");
- } else {
- profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile();
}
+ final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
return new Response(RpcType.RESP_QUERY_STATUS, profile);
+ }
default:
throw new RpcException("Not yet supported.");
}
-
}
@Override
- public void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException {
+ public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
logger.debug("Received remote fragment start instruction", fragment);
+ final DrillbitContext drillbitContext = bee.getContext();
try {
// we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
- if(fragment.getLeafFragment()){
- FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, bee.getContext().getFunctionImplementationRegistry());
- ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
- NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener);
+ if (fragment.getLeafFragment()) {
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+ drillbitContext.getFunctionImplementationRegistry());
+ final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+ final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+ final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+ fragment.getFragmentJson());
+ final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
bee.addFragmentRunner(fr);
- }else{ // isIntermediate, store for incoming data.
- NonRootFragmentManager manager = new NonRootFragmentManager(fragment, bee);
- bee.getContext().getWorkBus().setFragmentManager(manager);
+ } else {
+ // isIntermediate, store for incoming data.
+ final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+ drillbitContext.getWorkBus().addFragmentManager(manager);
}
} catch (Exception e) {
- throw new UserRpcException(bee.getContext().getEndpoint(), "Failure while trying to start remote fragment", e);
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Failure while trying to start remote fragment", e);
} catch (OutOfMemoryError t) {
if (t.getMessage().startsWith("Direct buffer")) {
- throw new UserRpcException(bee.getContext().getEndpoint(), "Out of direct memory while trying to start remote fragment", t);
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Out of direct memory while trying to start remote fragment", t);
} else {
throw t;
}
}
-
}
/* (non-Javadoc)
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
@Override
- public Ack cancelFragment(FragmentHandle handle) {
- FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ public Ack cancelFragment(final FragmentHandle handle) {
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
if (manager != null) {
// try remote fragment cancel.
manager.cancel();
} else {
// then try local cancel.
- FragmentExecutor runner = bee.getFragmentRunner(handle);
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
if (runner != null) {
runner.cancel();
}
@@ -167,8 +170,9 @@ public class ControlHandlerImpl implements ControlMessageHandler {
return Acks.OK;
}
- public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) {
- FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+ private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+ final FragmentManager manager =
+ bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
FragmentExecutor executor;
if (manager != null) {
@@ -184,5 +188,4 @@ public class ControlHandlerImpl implements ControlMessageHandler {
return Acks.OK;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index f0b4983..2a79e42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
return batch;
} catch (InterruptedException e) {
return null;
+ // TODO InterruptedException
}
}
if (w == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 3d5b948..2430e64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
b = buffer.take();
} catch (InterruptedException e) {
return null;
+ // TODO InterruptedException
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
index ca52f0c..80f2ca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
@@ -26,7 +26,7 @@ import java.util.Set;
* Interface to define the listener to take actions when the set of active drillbits is changed.
*/
public interface DrillbitStatusListener {
-
+ // TODO this doesn't belong here
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class);
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8e0780b..9650ee5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -19,27 +19,27 @@ package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
+
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -64,10 +64,11 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.testing.ExceptionInjector;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.ErrorHelper;
@@ -76,117 +77,120 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
/**
- * Foreman manages all queries where this is the driving/root node.
+ * Foreman manages all the fragments (local and remote) for a single query where this
+ * is the driving/root node.
*
* The flow is as follows:
- * - Foreman is submitted as a runnable.
- * - Runnable does query planning.
- * - PENDING > RUNNING
- * - Runnable sends out starting fragments
- * - Status listener are activated
- * - Foreman listens for state move messages.
- *
+ * - Foreman is submitted as a runnable.
+ * - Runnable does query planning.
+ * - state changes from PENDING to RUNNING
+ * - Runnable sends out starting fragments
+ * - Status listener are activated
+ * - The Runnable's run() completes, but the Foreman stays around
+ * - Foreman listens for state change messages.
+ * - state change messages can drive the state to FAILED or CANCELED, in which case
+ * messages are sent to running fragments to terminate
+ * - when all fragments complete, state change messages drive the state to COMPLETED
*/
-public class Foreman implements Runnable, Closeable, Comparable<Object> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-
- private QueryId queryId;
- private RunQuery queryRequest;
- private QueryContext context;
- private QueryManager queryManager;
- private WorkerBee bee;
- private UserClientConnection initiatingClient;
+public class Foreman implements Runnable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
+ private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+
+ private final QueryId queryId;
+ private final RunQuery queryRequest;
+ private final QueryContext queryContext;
+ private final QueryManager queryManager; // handles lower-level details of query execution
+ private final WorkerBee bee; // provides an interface to submit tasks
+ private final DrillbitContext drillbitContext;
+ private final UserClientConnection initiatingClient; // used to send responses
private volatile QueryState state;
- private final DistributedSemaphore smallSemaphore;
- private final DistributedSemaphore largeSemaphore;
- private final long queueThreshold;
- private final long queueTimeout;
- private volatile DistributedLease lease;
- private final boolean queuingEnabled;
+ private volatile DistributedLease lease; // used to limit the number of concurrent queries
+
+ private FragmentExecutor rootRunner; // root Fragment
- private FragmentExecutor rootRunner;
- private final CountDownLatch acceptExternalEvents = new CountDownLatch(1);
- private final StateListener stateListener = new StateListener();
+ private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+ private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
+ private final ForemanResult foremanResult = new ForemanResult();
- public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
- RunQuery queryRequest) {
+ /**
+ * Constructor. Sets up the Foreman, but does not initiate any execution.
+ *
+ * @param bee used to submit additional work
+ * @param drillbitContext
+ * @param connection
+ * @param queryId the id for the query
+ * @param queryRequest the query to execute
+ */
+ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
+ final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
+ this.bee = bee;
this.queryId = queryId;
this.queryRequest = queryRequest;
- this.context = new QueryContext(connection.getSession(), queryId, dContext);
+ this.drillbitContext = drillbitContext;
- // set up queuing
- this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
- if (queuingEnabled) {
- int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
- int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
- this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue);
- this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue);
- this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
- this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
- } else {
- this.largeSemaphore = null;
- this.smallSemaphore = null;
- this.queueThreshold = 0;
- this.queueTimeout = 0;
- }
- // end queuing setup.
-
- this.initiatingClient = connection;
- this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(),
- stateListener, this);
- this.bee = bee;
+ initiatingClient = connection;
+ queryContext = new QueryContext(connection.getSession(), drillbitContext);
+ queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
+ stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
recordNewState(QueryState.PENDING);
}
- public QueryContext getContext() {
- return context;
+ /**
+ * Get the QueryContext created for the query.
+ *
+ * @return the QueryContext
+ */
+ public QueryContext getQueryContext() {
+ return queryContext;
}
- public void cancel() {
- stateListener.moveToState(QueryState.CANCELED, null);
+ /**
+ * Get the QueryManager created for the query.
+ *
+ * @return the QueryManager
+ */
+ public QueryManager getQueryManager() {
+ return queryManager;
}
- private void cleanup(QueryResult result) {
- logger.info("foreman cleaning up - status: {}", queryManager.getStatus());
-
- bee.retireForeman(this);
- context.getWorkBus().removeFragmentStatusListener(queryId);
- context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
-
- try {
- try {
- context.close();
- } catch (Exception e) {
- moveToState(QueryState.FAILED, e);
- return;
- }
-
- if (result != null) {
- initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
- }
- } finally {
- releaseLease();
- }
+ /**
+ * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be
+ * terminated.
+ */
+ public void cancel() {
+ // Note this can be called from outside of run() on another thread, or after run() completes
+ stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null);
}
/**
- * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
+ * Called by execution pool to do query setup, and kick off remote execution.
+ *
+ * <p>Note that completion of this function is not the end of the Foreman's role
+ * in the query's lifecycle.
*/
+ @Override
public void run() {
+ // rename the thread we're using for debugging purposes
+ final Thread currentThread = Thread.currentThread();
+ final String originalName = currentThread.getName();
+ currentThread.setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
+
+ // track how long the query takes
+ queryManager.markStartTime();
- final String originalThread = Thread.currentThread().getName();
- Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
- getStatus().markStart();
- // convert a run query request into action
try {
+ injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class);
+
+ // convert a run query request into action
switch (queryRequest.getType()) {
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
@@ -200,39 +204,71 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
default:
throw new IllegalStateException();
}
+ injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
} catch (ForemanException e) {
moveToState(QueryState.FAILED, e);
-
} catch (AssertionError | Exception ex) {
- moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
-
+ moveToState(QueryState.FAILED,
+ new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
} catch (OutOfMemoryError e) {
+ /*
+ * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
+ * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
+ * attempt to notify them, which might not work under these conditions.
+ */
+ /*
+ * TODO this will kill everything in this JVM; why can't we just free all allocation
+ * associated with this Foreman and allow others to continue?
+ */
System.out.println("Out of memory, exiting.");
e.printStackTrace();
System.out.flush();
System.exit(-1);
-
} finally {
- Thread.currentThread().setName(originalThread);
+ /*
+ * Begin accepting external events.
+ *
+ * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
+ * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
+ * as a result of any partial setup that was done (such as the FragmentSubmitListener,
+ * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
+ * event delivery call.
+ *
+ * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
+ * make sure that we can't make things any worse as those events are delivered, but allow
+ * any necessary remaining cleanup to proceed.
+ */
+ acceptExternalEvents.countDown();
+
+ // restore the thread's original name
+ currentThread.setName(originalName);
}
+
+ /*
+ * Note that despite the run() completing, the Foreman continues to exist, and receives
+ * events (indirectly, through the QueryManager's use of stateListener), about fragment
+ * completions. It won't go away until everything is completed, failed, or cancelled.
+ */
}
private void releaseLease() {
- if (lease != null) {
+ while (lease != null) {
try {
lease.close();
+ lease = null;
+ } catch (InterruptedException e) {
+ // if we end up here, the while loop will try again
} catch (Exception e) {
logger.warn("Failure while releasing lease.", e);
+ break;
}
- ;
}
-
}
- private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException {
+ private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
- logicalPlan = context.getPlanReader().readLogicalPlan(json);
+ logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
} catch (IOException e) {
throw new ForemanException("Failure parsing logical plan.", e);
}
@@ -244,7 +280,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
log(logicalPlan);
- PhysicalPlan physicalPlan = convert(logicalPlan);
+ final PhysicalPlan physicalPlan = convert(logicalPlan);
if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
returnPhysical(physicalPlan);
@@ -252,20 +288,19 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
log(physicalPlan);
-
runPhysicalPlan(physicalPlan);
}
- private void log(LogicalPlan plan) {
+ private void log(final LogicalPlan plan) {
if (logger.isDebugEnabled()) {
- logger.debug("Logical {}", plan.unparse(context.getConfig()));
+ logger.debug("Logical {}", plan.unparse(queryContext.getConfig()));
}
}
- private void log(PhysicalPlan plan) {
+ private void log(final PhysicalPlan plan) {
if (logger.isDebugEnabled()) {
try {
- String planText = context.getConfig().getMapper().writeValueAsString(plan);
+ String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
logger.debug("Physical {}", planText);
} catch (IOException e) {
logger.warn("Error while attempting to log physical plan.", e);
@@ -273,60 +308,54 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
}
- private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException {
- String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
- runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan)));
+ private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
+ final String jsonPlan = plan.unparse(queryContext.getConfig().getMapper().writer());
+ runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
}
public static class PhysicalFromLogicalExplain {
- public String json;
+ public final String json;
- public PhysicalFromLogicalExplain(String json) {
- super();
+ public PhysicalFromLogicalExplain(final String json) {
this.json = json;
}
-
}
- private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException {
+ private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
try {
- PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
+ final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
runPhysicalPlan(plan);
} catch (IOException e) {
throw new ForemanSetupException("Failure while parsing physical plan.", e);
}
}
- private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException {
-
+ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
setupSortMemoryAllocations(plan);
acquireQuerySemaphore(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
-
- this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager);
- this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager);
-
- logger.debug("Submitting fragments to run.");
-
+ final List<PlanFragment> planFragments = work.getFragments();
final PlanFragment rootPlanFragment = work.getRootFragment();
assert queryId == rootPlanFragment.getHandle().getQueryId();
- queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size());
+ drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
+ drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+
+ logger.debug("Submitting fragments to run.");
// set up the root fragment first so we'll have incoming buffers available.
setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
- setupNonRootFragments(work.getFragments());
- bee.getContext().getAllocator().resetFragmentLimits();
+ setupNonRootFragments(planFragments);
+ drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
moveToState(QueryState.RUNNING, null);
logger.debug("Fragments running.");
-
}
- private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{
+ private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
if (plan.getProperties().resultMode != ResultMode.EXEC) {
throw new ForemanSetupException(String.format(
"Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC",
@@ -334,369 +363,632 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
}
- private void setupSortMemoryAllocations(PhysicalPlan plan){
- int sortCount = 0;
+ private void setupSortMemoryAllocations(final PhysicalPlan plan) {
+ // look for external sorts
+ final List<ExternalSort> sortList = new LinkedList<>();
for (PhysicalOperator op : plan.getSortedOperators()) {
if (op instanceof ExternalSort) {
- sortCount++;
+ sortList.add((ExternalSort) op);
}
}
- if (sortCount > 0) {
- long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
+ // if there are any sorts, compute the maximum allocation, and set it on them
+ if (sortList.size() > 0) {
+ final OptionManager optionManager = queryContext.getOptions();
+ final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
- context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+ queryContext.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
- context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
- long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode);
+ optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+ final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
logger.debug("Max sort alloc: {}", maxSortAlloc);
- for (PhysicalOperator op : plan.getSortedOperators()) {
- if (op instanceof ExternalSort) {
- ((ExternalSort) op).setMaxAllocation(maxSortAlloc);
- }
+
+ for(ExternalSort externalSort : sortList) {
+ externalSort.setMaxAllocation(maxSortAlloc);
}
}
}
- private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException {
-
- double size = 0;
- for (PhysicalOperator ops : plan.getSortedOperators()) {
- size += ops.getCost();
- }
-
+ /**
+ * This limits the number of "small" and "large" queries that a Drill cluster will run
+ * simultaneously, if queueing is enabled. If the query is unable to run, this will block
+ * until it can. Beware that this is called under run(), and so will consume a Thread
+ * while it waits for the required distributed semaphore.
+ *
+ * @param plan the query plan
+ * @throws ForemanSetupException
+ */
+ private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
+ final OptionManager optionManager = queryContext.getOptions();
+ final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
if (queuingEnabled) {
+ final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+ double totalCost = 0;
+ for (PhysicalOperator ops : plan.getSortedOperators()) {
+ totalCost += ops.getCost();
+ }
+
try {
- if (size > this.queueThreshold) {
- this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+ @SuppressWarnings("resource")
+ final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
+ DistributedSemaphore distributedSemaphore;
+
+ // get the appropriate semaphore
+ if (totalCost > queueThreshold) {
+ final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
} else {
- this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+ final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
}
+
+ final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+ lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new ForemanSetupException("Unable to acquire slot for query.", e);
}
}
}
- private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
- PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
- Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+ private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+ final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+ final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+ final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
+ final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+ queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+ queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
+ initiatingClient.getSession(), queryContext.getQueryDateTimeInfo());
+
+ if (logger.isInfoEnabled()) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("PlanFragments for query ");
+ sb.append(queryId);
+ sb.append('\n');
+
+ final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+ final int fragmentCount = planFragments.size();
+ int fragmentIndex = 0;
+ for(PlanFragment planFragment : planFragments) {
+ final FragmentHandle fragmentHandle = planFragment.getHandle();
+ sb.append("PlanFragment(");
+ sb.append(++fragmentIndex);
+ sb.append('/');
+ sb.append(fragmentCount);
+ sb.append(") major_fragment_id ");
+ sb.append(fragmentHandle.getMajorFragmentId());
+ sb.append(" minor_fragment_id ");
+ sb.append(fragmentHandle.getMinorFragmentId());
+ sb.append('\n');
+
+ final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+ sb.append(" DrillbitEndpoint address ");
+ sb.append(endpointAssignment.getAddress());
+ sb.append('\n');
+
+ String jsonString = "<<malformed JSON>>";
+ sb.append(" fragment_json: ");
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+ jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+ } catch(Exception e) {
+ // we've already set jsonString to a fallback value
+ }
+ sb.append(jsonString);
- SimpleParallelizer parallelizer = new SimpleParallelizer(context);
- return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
- queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession(),
- context.getQueryDateTimeInfo());
+ logger.info(sb.toString());
+ }
+ }
+
+ return queryWorkUnit;
}
/**
- * Tells the foreman to move to a new state. Note that
- * @param state
- * @return
+ * Manages the end-state processing for Foreman.
+ *
+ * End-state processing is tricky, because even if a query appears to succeed, but
+ * we then encounter a problem during cleanup, we still want to mark the query as
+ * failed. So we have to construct the successful result we would send, and then
+ * clean up before we send that result, possibly changing that result if we encounter
+ * a problem during cleanup. We only send the result when there is nothing left to
+ * do, so it will account for any possible problems.
+ *
+ * The idea here is to make close()ing the ForemanResult do the final cleanup and
+ * sending. Closing the result must be the last thing that is done by Foreman.
*/
- private synchronized boolean moveToState(QueryState newState, Exception exception){
- logger.info("State change requested. {} --> {}", state, newState, exception);
- outside: switch(state) {
+ private class ForemanResult implements AutoCloseable {
+ private QueryState resultState = null;
+ private Exception resultException = null;
+ private boolean isClosed = false;
+
+ /**
+ * Set up the result for a COMPLETED or CANCELED state.
+ *
+ * <p>Note that before sending this result, we execute cleanup steps that could
+ * result in this result still being changed to a FAILED state.
+ *
+ * @param queryState one of COMPLETED or CANCELED
+ */
+ public void setCompleted(final QueryState queryState) {
+ Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState == null);
+
+ resultState = queryState;
+ }
+
+ /**
+ * Set up the result for a FAILED state.
+ *
+ * <p>Failures that occur during cleanup processing will be added as suppressed
+ * exceptions.
+ *
+ * @param exception the exception that led to the FAILED state
+ */
+ public void setFailed(final Exception exception) {
+ Preconditions.checkArgument(exception != null);
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState == null);
+
+ resultState = QueryState.FAILED;
+ resultException = exception;
+ }
+
+ /**
+ * Add an exception to the result. All exceptions after the first become suppressed
+ * exceptions hanging off the first.
+ *
+ * @param exception the exception to add
+ */
+ private void addException(final Exception exception) {
+ Preconditions.checkNotNull(exception);
+
+ if (resultException == null) {
+ resultException = exception;
+ } else {
+ resultException.addSuppressed(exception);
+ }
+ }
- case PENDING:
- // since we're moving out of pending, we can now start accepting other changes in state.
- // This guarantees that the first state change is driven by the original thread.
- acceptExternalEvents.countDown();
+ /**
+ * Close the given resource, catching and adding any caught exceptions via
+ * {@link #addException(Exception)}. If an exception is caught, it will change
+ * the result state to FAILED, regardless of what its current value.
+ *
+ * @param autoCloseable the resource to close
+ */
+ private void suppressingClose(final AutoCloseable autoCloseable) {
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState != null);
- if(newState == QueryState.RUNNING){
- recordNewState(QueryState.RUNNING);
- return true;
+ if (autoCloseable == null) {
+ return;
}
- // fall through to running behavior.
- //
- case RUNNING: {
+ try {
+ autoCloseable.close();
+ } catch(Exception e) {
+ /*
+ * Even if the query completed successfully, we'll still report failure if we have
+ * problems cleaning up.
+ */
+ resultState = QueryState.FAILED;
+ addException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState != null);
+
+ logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+
+ // These are straight forward removals from maps, so they won't throw.
+ drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
+ drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+
+ suppressingClose(queryContext);
+
+ /*
+ * We do our best to write the latest state, but even that could fail. If it does, we can't write
+ * the (possibly newly failing) state, so we continue on anyway.
+ *
+ * We only need to do this if the resultState differs from the last recorded state
+ */
+ if (resultState != state) {
+ suppressingClose(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ recordNewState(resultState);
+ }
+ });
+ }
+
+ /*
+ * Construct the response based on the latest resultState. The builder shouldn't fail.
+ */
+ final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
+ .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary
+ .setQueryId(queryId)
+ .setQueryState(resultState);
+ if (resultException != null) {
+ final DrillPBError error = ErrorHelper.logAndConvertError(queryContext.getCurrentEndpoint(),
+ ExceptionUtils.getRootCauseMessage(resultException), resultException, logger);
+ resultBuilder.addError(error);
+ }
+
+ /*
+ * If sending the result fails, we don't really have any way to modify the result we tried to send;
+ * it is possible it got sent but the result came from a later part of the code path. It is also
+ * possible the connection has gone away, so this is irrelevant because there's nowhere to
+ * send anything to.
+ */
+ try {
+ // send whatever result we ended up with
+ initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true);
+ } catch(Exception e) {
+ addException(e);
+ logger.warn("Exception sending result to client", resultException);
+ }
+
+ try {
+ releaseLease();
+ } finally {
+ isClosed = true;
+ }
+ }
+ }
+
+ /**
+ * Tells the foreman to move to a new state.
+ *
+ * @param newState the state to move to
+ * @param exception if not null, the exception that drove this state transition (usually a failure)
+ */
+ private synchronized void moveToState(final QueryState newState, final Exception exception) {
+ logger.info("State change requested. {} --> {}", state, newState, exception);
+ switch(state) {
+ case PENDING:
+ if (newState == QueryState.RUNNING) {
+ recordNewState(QueryState.RUNNING);
+ return;
+ }
- switch(newState){
+ //$FALL-THROUGH$
- case CANCELED: {
+ case RUNNING: {
+ /*
+ * For cases that cancel executing fragments, we have to record the new state first, because
+ * the cancellation of the local root fragment will cause this to be called recursively.
+ */
+ switch(newState) {
+ case CANCELLATION_REQUESTED: {
assert exception == null;
- recordNewState(QueryState.CANCELED);
- cancelExecutingFragments();
- QueryResult result = QueryResult.newBuilder() //
- .setQueryId(queryId) //
- .setQueryState(QueryState.CANCELED) //
- .setIsLastChunk(true) //
- .build();
-
- // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
- // a terminal state(completed, failed)
- initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
- return true;
+ queryManager.markEndTime();
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten acknowledgements, which
+ * happens below in the case for current state == CANCELLATION_REQUESTED.
+ */
+ return;
}
case COMPLETED: {
assert exception == null;
+ queryManager.markEndTime();
recordNewState(QueryState.COMPLETED);
- QueryResult result = QueryResult //
- .newBuilder() //
- .setQueryState(QueryState.COMPLETED) //
- .setQueryId(queryId) //
- .build();
- cleanup(result);
- return true;
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ return;
}
-
- case FAILED:
+ case FAILED: {
assert exception != null;
+ queryManager.markEndTime();
recordNewState(QueryState.FAILED);
- cancelExecutingFragments();
- DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(),
- ExceptionUtils.getRootCauseMessage(exception), exception, logger);
- QueryResult result = QueryResult //
- .newBuilder() //
- .addError(error) //
- .setIsLastChunk(true) //
- .setQueryState(QueryState.FAILED) //
- .setQueryId(queryId) //
- .build();
- cleanup(result);
- return true;
- default:
- break outside;
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
+ default:
+ throw new IllegalStateException("illegal transition from RUNNING to " + newState);
}
}
+ case CANCELLATION_REQUESTED:
+ if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
+ || (newState == QueryState.FAILED)) {
+ /*
+ * These amount to a completion of the cancellation requests' cleanup; now we
+ * can clean up and send the result.
+ */
+ foremanResult.close();
+ }
+ return;
+
case CANCELED:
case COMPLETED:
- case FAILED: {
- // no op.
- logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
- return false;
- }
-
- }
-
- throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
- }
-
- private void cancelExecutingFragments(){
-
- // Stop all framgents with a currently active status.
- List<FragmentData> fragments = getStatus().getFragmentData();
- Collections.sort(fragments, new Comparator<FragmentData>() {
- @Override
- public int compare(FragmentData o1, FragmentData o2) {
- return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
- }
- });
- for(FragmentData data: fragments){
- FragmentHandle handle = data.getStatus().getHandle();
- switch(data.getStatus().getProfile().getState()){
- case SENDING:
- case AWAITING_ALLOCATION:
- case RUNNING:
- if(data.isLocal()){
- if(rootRunner != null){
- rootRunner.cancel();
- }
- }else{
- bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
- }
- break;
- default:
- break;
- }
+ case FAILED:
+ logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
+ newState, state);
+ return;
}
+ throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
+ state.name(), newState.name()));
}
- private QueryStatus getStatus(){
- return queryManager.getStatus();
- }
-
- private void recordNewState(QueryState newState){
- this.state = newState;
- getStatus().updateQueryStateInStore(newState);
+ private void recordNewState(final QueryState newState) {
+ state = newState;
+ queryManager.updateQueryStateInStore(newState);
}
- private void runSQL(String sql) throws ExecutionSetupException {
- DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
- Pointer<String> textPlan = new Pointer<>();
- PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
- getStatus().setPlanText(textPlan.value);
+ private void runSQL(final String sql) throws ExecutionSetupException {
+ final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext);
+ final Pointer<String> textPlan = new Pointer<>();
+ final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
+ queryManager.setPlanText(textPlan.value);
runPhysicalPlan(plan);
}
- private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+ private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
if (logger.isDebugEnabled()) {
- logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
+ logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
}
- return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(
- new BasicOptimizer.BasicOptimizationContext(context), plan);
+ return new BasicOptimizer(queryContext).optimize(
+ new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
public QueryId getQueryId() {
return queryId;
}
- @Override
- public void close() throws IOException {
- }
-
- public QueryStatus getQueryStatus() {
- return this.queryManager.getStatus();
- }
-
- private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException {
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext()
- .getFunctionImplementationRegistry());
-
- IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
-
+ /**
+ * Set up the root fragment (which will run locally), and submit it for execution.
+ *
+ * @param rootFragment
+ * @param rootClient
+ * @param rootOperator
+ * @throws ExecutionSetupException
+ */
+ private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
+ final FragmentRoot rootOperator) throws ExecutionSetupException {
+ @SuppressWarnings("resource")
+ final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
+ drillbitContext.getFunctionImplementationRegistry());
+ @SuppressWarnings("resource")
+ final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
rootContext.setBuffers(buffers);
- // add fragment to local node.
queryManager.addFragmentStatusTracker(rootFragment, true);
- this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment));
- RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+ rootRunner = new FragmentExecutor(rootContext, rootOperator,
+ queryManager.getRootStatusHandler(rootContext));
+ final RootFragmentManager fragmentManager =
+ new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
if (buffers.isDone()) {
// if we don't have to wait for any incoming data, start the fragment runner.
bee.addFragmentRunner(fragmentManager.getRunnable());
} else {
// if we do, record the fragment manager in the workBus.
- bee.getContext().getWorkBus().setFragmentManager(fragmentManager);
+ // TODO aren't we managing our own work? What does this do? It looks like this will never get run
+ drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
}
}
- private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{
- Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
- Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
+ /**
+ * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+ * Messages are sent immediately, so they may start returning data even before we complete this.
+ *
+ * @param fragments the fragments
+ * @throws ForemanException
+ */
+ private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
+ /*
+ * We will send a single message to each endpoint, regardless of how many fragments will be
+ * executed there. We need to start up the intermediate fragments first so that they will be
+ * ready once the leaf fragments start producing data. To satisfy both of these, we will
+ * make a pass through the fragments and put them into these two maps according to their
+ * leaf/intermediate state, as well as their target drillbit.
+ */
+ final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
+ final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
// record all fragments for status purposes.
- for (PlanFragment f : fragments) {
+ for (PlanFragment planFragment : fragments) {
// logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
- queryManager.addFragmentStatusTracker(f, false);
- if (f.getLeafFragment()) {
- leafFragmentMap.put(f.getAssignment(), f);
+ queryManager.addFragmentStatusTracker(planFragment, false);
+ if (planFragment.getLeafFragment()) {
+ leafFragmentMap.put(planFragment.getAssignment(), planFragment);
} else {
- intFragmentMap.put(f.getAssignment(), f);
+ intFragmentMap.put(planFragment.getAssignment(), planFragment);
}
}
- CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
+ /*
+ * We need to wait for the intermediates to be sent so that they'll be set up by the time
+ * the leaves start producing data. We'll use this latch to wait for the responses.
+ *
+ * However, in order not to hang the process if any of the RPC requests fails, we always
+ * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+ * know if any submissions did fail.
+ */
+ final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+ final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
- sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
+ sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
- // wait for send complete
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e);
+ // wait for the status of all requests sent above to be known
+ boolean ready = false;
+ while(!ready) {
+ try {
+ endpointLatch.await();
+ ready = true;
+ } catch (InterruptedException e) {
+ // if we weren't ready, the while loop will continue to wait
+ }
}
- // send remote (leaf) fragments.
- for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
- sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
+ // if any of the intermediate fragment submissions failed, fail the query
+ final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+ fragmentSubmitFailures.submissionExceptions;
+ if (submissionExceptions.size() > 0) {
+ throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
+ submissionExceptions.get(0).rpcException);
+ // TODO indicate the failing drillbit?
+ // TODO report on all the failures?
}
- }
- public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){
- return new FragmentSubmitListener(endpoint, value, latch);
+ /*
+ * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
+ * the regular sendListener event delivery.
+ */
+ for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+ sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
+ }
}
- private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){
- Controller controller = bee.getContext().getController();
- InitializeFragments.Builder fb = InitializeFragments.newBuilder();
- for(PlanFragment f : fragments){
- fb.addFragment(f);
+ /**
+ * Send all the remote fragments belonging to a single target drillbit in one request.
+ *
+ * @param assignment the drillbit assigned to these fragments
+ * @param fragments the set of fragments
+ * @param latch the countdown latch used to track the requests to all endpoints
+ * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
+ */
+ private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
+ final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+ @SuppressWarnings("resource")
+ final Controller controller = drillbitContext.getController();
+ final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+ for(PlanFragment planFragment : fragments) {
+ fb.addFragment(planFragment);
}
- InitializeFragments initFrags = fb.build();
+ final InitializeFragments initFrags = fb.build();
logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
- FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch);
+ final FragmentSubmitListener listener =
+ new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
controller.getTunnel(assignment).sendFragments(listener, initFrags);
}
- public QueryState getState(){
+ public QueryState getState() {
return state;
}
- private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{
+ /**
+ * Used by {@link FragmentSubmitListener} to track the number of submission failures.
+ */
+ private static class FragmentSubmitFailures {
+ static class SubmissionException {
+// final DrillbitEndpoint drillbitEndpoint;
+ final RpcException rpcException;
+
+ SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
+ final RpcException rpcException) {
+// this.drillbitEndpoint = drillbitEndpoint;
+ this.rpcException = rpcException;
+ }
+ }
+
+ final List<SubmissionException> submissionExceptions = new LinkedList<>();
+
+ void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
+ submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
+ }
+ }
- private CountDownLatch latch;
+ private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> {
+ private final CountDownLatch latch;
+ private final FragmentSubmitFailures fragmentSubmitFailures;
- public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) {
+ /**
+ * Constructor.
+ *
+ * @param endpoint the endpoint for the submission
+ * @param value the initialize fragments message
+ * @param latch the latch to count down when the status is known; may be null
+ * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
+ */
+ public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
+ final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
super(endpoint, value);
+ Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
this.latch = latch;
+ this.fragmentSubmitFailures = fragmentSubmitFailures;
}
@Override
- public void success(Ack ack, ByteBuf byteBuf) {
+ public void success(final Ack ack, final ByteBuf byteBuf) {
if (latch != null) {
latch.countDown();
}
}
@Override
- public void failed(RpcException ex) {
- logger.debug("Failure while sending fragment. Stopping query.", ex);
- moveToState(QueryState.FAILED, ex);
+ public void failed(final RpcException ex) {
+ if (latch != null) {
+ fragmentSubmitFailures.addFailure(endpoint, ex);
+ latch.countDown();
+ } else {
+ // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
+ logger.debug("Failure while sending fragment. Stopping query.", ex);
+ stateListener.moveToState(QueryState.FAILED, ex);
+ }
}
-
}
-
+ /**
+ * Provides gated access to state transitions.
+ *
+ * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The
+ * latch will be tripped when the Foreman is sufficiently set up that it can receive and process
+ * external events from other threads.
+ */
public class StateListener {
- public boolean moveToState(QueryState newState, Exception ex){
- try {
- acceptExternalEvents.await();
- } catch(InterruptedException e){
- logger.warn("Interrupted while waiting to move state.", e);
- return false;
+ /**
+ * Move the Foreman to the specified new state.
+ *
+ * @param newState the state to move to
+ * @param ex if moving to a failure state, the exception that led to the failure; used for reporting
+ * to the user
+ */
+ public void moveToState(final QueryState newState, final Exception ex) {
+ boolean ready = false;
+ while(!ready) {
+ try {
+ acceptExternalEvents.await();
+ ready = true;
+ } catch(InterruptedException e) {
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting to move state.", e);
+ }
}
- return Foreman.this.moveToState(newState, ex);
+ Foreman.this.moveToState(newState, ex);
}
}
-
- @Override
- public int compareTo(Object o) {
- return hashCode() - o.hashCode();
- }
-
+ /**
+ * Listens for the status of the RPC response sent to the user for the query.
+ */
private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
- public void failed(RpcException ex) {
- logger
- .info(
- "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
- ex);
- moveToState(QueryState.FAILED, ex);
+ public void failed(final RpcException ex) {
+ logger.info(
+ "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
+ ex);
+ stateListener.moveToState(QueryState.FAILED, ex);
}
}
-
-
- private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-
- public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
- super(endpoint, handle);
- }
-
- @Override
- public void failed(RpcException ex) {
- logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
- }
-
- @Override
- public void success(Ack value, ByteBuf buf) {
- if(!value.getOk()){
- logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
- }
- // do nothing.
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 52fd0a9..433ab26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -29,19 +29,21 @@ public class FragmentData {
private volatile long lastStatusUpdate = 0;
private final DrillbitEndpoint endpoint;
- public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
- super();
- MinorFragmentProfile f = MinorFragmentProfile.newBuilder() //
- .setState(FragmentState.SENDING) //
- .setMinorFragmentId(handle.getMinorFragmentId()) //
- .setEndpoint(endpoint) //
- .build();
- this.status = FragmentStatus.newBuilder().setHandle(handle).setProfile(f).build();
+ public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
this.endpoint = endpoint;
this.isLocal = isLocal;
+ final MinorFragmentProfile f = MinorFragmentProfile.newBuilder()
+ .setState(FragmentState.SENDING)
+ .setMinorFragmentId(handle.getMinorFragmentId())
+ .setEndpoint(endpoint)
+ .build();
+ status = FragmentStatus.newBuilder()
+ .setHandle(handle)
+ .setProfile(f)
+ .build();
}
- public void setStatus(FragmentStatus status){
+ public void setStatus(final FragmentStatus status) {
this.status = status;
lastStatusUpdate = System.currentTimeMillis();
}
@@ -54,15 +56,11 @@ public class FragmentData {
return isLocal;
}
- public long getLastStatusUpdate() {
- return lastStatusUpdate;
- }
-
public DrillbitEndpoint getEndpoint() {
return endpoint;
}
- public FragmentHandle getHandle(){
+ public FragmentHandle getHandle() {
return status.getHandle();
}
@@ -71,7 +69,4 @@ public class FragmentData {
return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate
+ ", endpoint=" + endpoint + "]";
}
-
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
index 6a719d2..b2a40ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
@@ -20,7 +20,5 @@ package org.apache.drill.exec.work.foreman;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
public interface FragmentStatusListener {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class);
-
public void statusUpdate(FragmentStatus status);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 2de3592..8626d5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -17,144 +17,353 @@
*/
package org.apache.drill.exec.work.foreman;
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RemoteRpcException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
/**
* Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments.
*/
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
- private final Set<DrillbitEndpoint> includedBits;
+public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+
+ public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
+ newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
+ .name("profiles")
+ .blob()
+ .max(100)
+ .build();
- private final QueryStatus status;
+ public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
+ newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+ .name("running")
+ .ephemeral()
+ .build();
+
+ private final Set<DrillbitEndpoint> includedBits;
private final StateListener stateListener;
- private final AtomicInteger remainingFragmentCount;
private final QueryId queryId;
+ private final String stringQueryId;
+ private final RunQuery runQuery;
+ private final Foreman foreman;
+
+ /*
+ * Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then
+ * accessed by multiple threads for reads only.
+ */
+ private final IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap =
+ new IntObjectOpenHashMap<>();
+ private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
+
+ private final PStore<QueryProfile> profilePStore;
+ private final PStore<QueryInfo> profileEStore;
- public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
+ // the following mutable variables are used to capture ongoing query status
+ private String planText;
+ private long startTime;
+ private long endTime;
+ private final AtomicInteger finishedFragments = new AtomicInteger(0);
+
+ public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
+ final StateListener stateListener, final Foreman foreman) {
+ this.queryId = queryId;
+ this.runQuery = runQuery;
this.stateListener = stateListener;
- this.queryId = id;
- this.remainingFragmentCount = new AtomicInteger(0);
- this.status = new QueryStatus(query, id, pStoreProvider, foreman);
- this.includedBits = Sets.newHashSet();
- }
+ this.foreman = foreman;
+
+ stringQueryId = QueryIdHelper.getQueryId(queryId);
+ try {
+ profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
+ profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
- public QueryStatus getStatus(){
- return status;
+ includedBits = Sets.newHashSet();
}
@Override
- public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+ public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
}
@Override
- public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) {
- for(DrillbitEndpoint ep : unregisteredDrillbits){
- if(this.includedBits.contains(ep)){
- logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
- this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query. Identified node was " + ep.getAddress()));
+ public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+ for(DrillbitEndpoint ep : unregisteredDrillbits) {
+ if (includedBits.contains(ep)) {
+ logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}",
+ ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
+ stateListener.moveToState(QueryState.FAILED,
+ new ForemanException("One more more nodes lost connectivity during query. Identified node was "
+ + ep.getAddress()));
}
}
}
-
@Override
- public void statusUpdate(FragmentStatus status) {
-
- logger.debug("New fragment status was provided to Foreman of {}", status);
- switch(status.getProfile().getState()){
+ public void statusUpdate(final FragmentStatus status) {
+ logger.debug("New fragment status was provided to QueryManager of {}", status);
+ switch(status.getProfile().getState()) {
case AWAITING_ALLOCATION:
+ case RUNNING:
updateFragmentStatus(status);
break;
+
+ case FINISHED:
+ fragmentDone(status);
+ break;
+
case CANCELLED:
- //TODO: define a new query state to distinguish the state of early termination from cancellation
+ /*
+ * TODO
+ * This doesn't seem right; shouldn't this be similar to FAILED?
+ * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
+ *
+ * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
+ * but not if our cancellation listener gets it?
+ */
+ // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
fragmentDone(status);
break;
+
case FAILED:
stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
break;
- case FINISHED:
- fragmentDone(status);
- break;
- case RUNNING:
- updateFragmentStatus(status);
- break;
+
default:
throw new UnsupportedOperationException(String.format("Received status of %s", status));
}
}
- private void updateFragmentStatus(FragmentStatus status){
- this.status.updateFragmentStatus(status);
+ private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+ final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
+ final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+ final int minorFragmentId = fragmentHandle.getMinorFragmentId();
+ fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
}
- private void fragmentDone(FragmentStatus status){
- this.status.incrementFinishedFragments();
- int remaining = remainingFragmentCount.decrementAndGet();
+ private void fragmentDone(final FragmentStatus status) {
updateFragmentStatus(status);
+
+ final int finishedFragments = this.finishedFragments.incrementAndGet();
+ final int totalFragments = fragmentDataSet.size();
+ assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
+ final int remaining = totalFragments - finishedFragments;
logger.debug("waiting for {} fragments", remaining);
- if(remaining == 0){
+ if (remaining == 0) {
+ // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
stateListener.moveToState(QueryState.COMPLETED, null);
}
}
- public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
- remainingFragmentCount.set(countOfNonRootFragments + 1);
- logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
- status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
- this.status.setTotalFragments(countOfNonRootFragments + 1);
+ private void addFragment(final FragmentData fragmentData) {
+ final FragmentHandle fragmentHandle = fragmentData.getHandle();
+ final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+ final int minorFragmentId = fragmentHandle.getMinorFragmentId();
- List<FragmentData> fragments = status.getFragmentData();
- for (FragmentData fragment : fragments) {
- this.includedBits.add(fragment.getEndpoint());
+ IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
+ if (minorMap == null) {
+ minorMap = new IntObjectOpenHashMap<>();
+ fragmentDataMap.put(majorFragmentId, minorMap);
}
+ minorMap.put(minorFragmentId, fragmentData);
+ fragmentDataSet.add(fragmentData);
+
+ // keep track of all the drill bits that are used by this query
+ includedBits.add(fragmentData.getEndpoint());
+ }
+
+ public String getFragmentStatesAsString() {
+ return fragmentDataMap.toString();
}
- public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){
- addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot);
+ void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
+ addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
}
- public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){
- status.add(new FragmentData(handle, node, isRoot));
+ /**
+ * Stop all fragments with a currently active status.
+ */
+ void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ final Controller controller = drillbitContext.getController();
+ for(FragmentData data : fragmentDataSet) {
+ final FragmentStatus fragmentStatus = data.getStatus();
+ switch(fragmentStatus.getProfile().getState()) {
+ case SENDING:
+ case AWAITING_ALLOCATION:
+ case RUNNING:
+ if (rootRunner != null) {
+ rootRunner.cancel();
+ } else {
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ final FragmentHandle handle = fragmentStatus.getHandle();
+ // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+ controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+ }
+ break;
+
+ case FINISHED:
+ case CANCELLED:
+ case FAILED:
+ // nothing to do
+ break;
+ }
+ }
}
- public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){
- return new RootStatusReporter(context, fragment);
+ /*
+ * This assumes that the FragmentStatusListener implementation takes action when it hears
+ * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+ * but log messages.
+ */
+ private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
+ public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+ super(endpoint, handle);
+ }
+
+ @Override
+ public void failed(final RpcException ex) {
+ logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+ }
+
+ @Override
+ public void success(final Ack value, final ByteBuf buf) {
+ if (!value.getOk()) {
+ logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+ }
+ }
}
- class RootStatusReporter extends AbstractStatusReporter{
+ public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
+ return new RootStatusReporter(context);
+ }
- private RootStatusReporter(FragmentContext context, PlanFragment fragment){
+ class RootStatusReporter extends AbstractStatusReporter {
+ private RootStatusReporter(final FragmentContext context) {
super(context);
}
@Override
- protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
statusUpdate(status);
}
+ }
+
+ QueryState updateQueryStateInStore(final QueryState queryState) {
+ switch (queryState) {
+ case PENDING:
+ case RUNNING:
+ case CANCELLATION_REQUESTED:
+ profileEStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
+ break;
+
+ case COMPLETED:
+ case CANCELED:
+ case FAILED:
+ try {
+ profileEStore.delete(stringQueryId);
+ } catch(Exception e) {
+ logger.warn("Failure while trying to delete the estore profile for this query.", e);
+ }
+
+ // TODO(DRILL-2362) when do these ever get deleted?
+ profilePStore.put(stringQueryId, getQueryProfile());
+ break;
+
+ default:
+ throw new IllegalStateException("unrecognized queryState " + queryState);
+ }
+
+ return queryState;
+ }
+
+ private QueryInfo getQueryInfo() {
+ return QueryInfo.newBuilder()
+ .setQuery(runQuery.getPlan())
+ .setState(foreman.getState())
+ .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+ .setStart(startTime)
+ .build();
+ }
+
+ public QueryProfile getQueryProfile() {
+ final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
+ .setQuery(runQuery.getPlan())
+ .setType(runQuery.getType())
+ .setId(queryId)
+ .setState(foreman.getState())
+ .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+ .setStart(startTime)
+ .setEnd(endTime)
+ .setTotalFragments(fragmentDataSet.size())
+ .setFinishedFragments(finishedFragments.get());
+
+ if (planText != null) {
+ profileBuilder.setPlan(planText);
+ }
+
+ for (int i = 0; i < fragmentDataMap.allocated.length; i++) {
+ if (fragmentDataMap.allocated[i]) {
+ final int majorFragmentId = fragmentDataMap.keys[i];
+ final IntObjectOpenHashMap<FragmentData> minorMap =
+ (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i];
+ final MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder()
+ .setMajorFragmentId(majorFragmentId);
+ for (int v = 0; v < minorMap.allocated.length; v++) {
+ if (minorMap.allocated[v]) {
+ final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
+ fb.addMinorFragmentProfile(data.getStatus().getProfile());
+ }
+ }
+ profileBuilder.addFragmentProfile(fb);
+ }
+ }
+ return profileBuilder.build();
+ }
+
+ void setPlanText(final String planText) {
+ this.planText = planText;
+ }
+ void markStartTime() {
+ startTime = System.currentTimeMillis();
}
+ void markEndTime() {
+ endTime = System.currentTimeMillis();
+ }
}