You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:54 UTC

[09/12] drill git commit: DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through th

DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the Control layer.

+ Better error messages and more tests in TestDrillbitResilience and TestPauseInjection
+ Added execution controls to operator context
+ Removed ControlMessageHandler interface, renamed ControlHandlerImpl to ControlMessageHandler
+ Added CountDownLatchInjection, useful in cases like ParititionedSender that spawns multiple threads


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/52dcd7e4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/52dcd7e4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/52dcd7e4

Branch: refs/heads/merge_2015_05_09
Commit: 52dcd7e460e37031a08f6194cd597cc035d25a65
Parents: 87051d4
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Thu Apr 30 13:27:08 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:27:11 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |  10 +-
 .../apache/drill/exec/ops/OperatorContext.java  |   3 +
 .../drill/exec/ops/OperatorContextImpl.java     |   8 +
 .../drill/exec/physical/impl/ScreenCreator.java |   6 +-
 .../exec/rpc/control/ControlRpcConfig.java      |   4 +-
 .../drill/exec/rpc/control/ControlTunnel.java   |  20 +-
 .../drill/exec/rpc/user/UserRpcConfig.java      |   3 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |   9 +
 .../drill/exec/store/pojo/PojoRecordReader.java |  16 +-
 .../exec/testing/CountDownLatchInjection.java   |  51 ++++
 .../testing/CountDownLatchInjectionImpl.java    |  85 ++++++
 .../drill/exec/testing/ExecutionControls.java   |  37 ++-
 .../exec/testing/ExecutionControlsInjector.java |  13 +-
 .../apache/drill/exec/testing/Injection.java    |   6 +-
 .../exec/testing/NoOpControlsInjector.java      |  36 ++-
 .../drill/exec/testing/PauseInjection.java      |  33 +-
 .../org/apache/drill/exec/work/WorkManager.java |   3 +-
 .../exec/work/batch/ControlHandlerImpl.java     | 197 ------------
 .../exec/work/batch/ControlMessageHandler.java  | 195 +++++++++++-
 .../apache/drill/exec/work/foreman/Foreman.java |  25 +-
 .../drill/exec/work/foreman/QueryManager.java   |  44 ++-
 .../exec/work/fragment/FragmentExecutor.java    |  15 +-
 .../exec/work/fragment/FragmentManager.java     |   6 +
 .../work/fragment/NonRootFragmentManager.java   |   5 +
 .../exec/work/fragment/RootFragmentManager.java |   9 +-
 .../apache/drill/exec/work/user/UserWorker.java |   8 +
 .../exec/server/TestDrillbitResilience.java     | 303 +++++++++++--------
 .../testing/TestCountDownLatchInjection.java    | 155 ++++++++++
 .../drill/exec/testing/TestPauseInjection.java  | 146 ++++++++-
 .../org/apache/drill/exec/proto/BitControl.java |  48 ++-
 .../org/apache/drill/exec/proto/UserProtos.java |  46 ++-
 .../apache/drill/exec/proto/beans/RpcType.java  |   2 +
 protocol/src/main/protobuf/BitControl.proto     |  12 +-
 protocol/src/main/protobuf/User.proto           |   1 +
 34 files changed, 1106 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 136d8c7..c642c4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -312,10 +312,18 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
-    logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+    if(logger.isDebugEnabled()) {
+      logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+    }
     return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
   }
 
+  public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
+    if(logger.isDebugEnabled()) {
+      logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
+    }
+    return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
+  }
 
   /**
    * Submits a Logical plan for direct execution (bypasses parsing)

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 7cc52ba..35139d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 public abstract class OperatorContext {
 
@@ -36,6 +37,8 @@ public abstract class OperatorContext {
 
   public abstract OperatorStats getStats();
 
+  public abstract ExecutionControls getExecutionControls();
+
   public static int getChildCount(PhysicalOperator popConfig) {
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 6dbd880..9fa8867 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -24,11 +24,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
 
   private final BufferAllocator allocator;
+  private final ExecutionControls executionControls;
   private boolean closed = false;
   private PhysicalOperator popConfig;
   private OperatorStats stats;
@@ -42,6 +44,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
 
     OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
     this.stats = context.getStats().getOperatorStats(def, allocator);
+    executionControls = context.getExecutionControls();
   }
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
@@ -49,6 +52,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
     this.popConfig = popConfig;
     this.stats     = stats;
+    executionControls = context.getExecutionControls();
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {
@@ -70,6 +74,10 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return newBuf;
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   public BufferAllocator getAllocator() {
     if (allocator == null) {
       throw new UnsupportedOperationException("Operator context does not have an allocator");

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c31de66..76dc91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -52,7 +52,6 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
   static class ScreenRoot extends BaseRootExec {
-//    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     private final RecordBatch incoming;
     private final FragmentContext context;
     private final AccountingUserConnection userConnection;
@@ -136,6 +135,11 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
 
+    @Override
+    public void close() throws Exception {
+      injector.injectPause(context.getExecutionControls(), "send-complete", logger);
+      super.close();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index f92bb49..0cfa56e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -40,16 +40,18 @@ public class ControlRpcConfig {
         .name("CONTROL")
         .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
         .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
-        .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+        .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
         .build();
   }
 
   public static int RPC_VERSION = 3;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+  public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index a4f9fdf..16b9b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,12 +17,9 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import java.util.Collection;
-
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -56,7 +53,12 @@ public class ControlTunnel {
   }
 
   public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
-    CancelFragment b = new CancelFragment(outcomeListener, handle);
+    final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_CANCEL_FRAGMENT);
+    manager.runCommand(b);
+  }
+
+  public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
+    final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
     manager.runCommand(b);
   }
 
@@ -114,17 +116,19 @@ public class ControlTunnel {
     }
   }
 
-  public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
+  public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
     final FragmentHandle handle;
+    final RpcType type;
 
-    public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+    public SignalFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle, RpcType type) {
       super(listener);
       this.handle = handle;
+      this.type = type;
     }
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
-      connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+      connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
     }
 
   }
@@ -139,7 +143,7 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class);
+      connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index ae728d8..3f8122d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -36,11 +36,12 @@ public class UserRpcConfig {
     return RpcConfig.newBuilder()
         .name("USER")
         .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
-        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
         .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
         .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
         .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
         .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+        .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index b3b7ae9..72b07ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -113,6 +113,15 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
         throw new RpcException("Failure while decoding QueryId body.", e);
       }
 
+    case RpcType.RESUME_PAUSED_QUERY_VALUE:
+      try {
+        final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final Ack ack = worker.resumeQuery(queryId);
+        return new Response(RpcType.ACK, ack);
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding QueryId body.", e);
+      }
+
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index cf98b83..a893da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -42,13 +42,16 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
 import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
 import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 
 public class PojoRecordReader<T> extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+  private static final ExecutionControlsInjector injector =
+    ExecutionControlsInjector.getInjector(PojoRecordReader.class);
 
   public final int forJsonIgnore = 1;
 
@@ -64,16 +67,9 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
     this.iterator = iterator;
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    operatorContext = context;
     try {
       Field[] fields = pojoClass.getDeclaredFields();
       List<PojoWriter> writers = Lists.newArrayList();
@@ -147,7 +143,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   @Override
   public int next() {
     boolean allocated = false;
-
+    injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
     try {
       int i =0;
       outside:

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
new file mode 100644
index 0000000..de4a181
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * This class is used internally for tracking injected countdown latches. These latches are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * This injection is useful in the case where a thread spawns multiple threads. The parent thread initializes the latch
+ * with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
+ * and same descriptor), and once there are enough, the parent thread continues.
+ */
+public interface CountDownLatchInjection {
+
+  /**
+   * Initializes the underlying latch
+   * @param count the number of times {@link #countDown} must be invoke before threads can pass through {@link #await}
+   */
+  void initialize(final int count);
+
+  /**
+   * Causes the current thread to wait until the latch has counted down to zero, unless the thread is
+   * {@link Thread#interrupt interrupted}.
+   */
+  void await() throws InterruptedException;
+
+  /**
+   * Await without interruption. In the case of interruption, log a warning and continue to wait.
+   */
+  void awaitUninterruptibly();
+
+  /**
+   * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
+   */
+  void countDown();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
new file mode 100644
index 0000000..f4012c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
+ * {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
+ * provides more control than PauseInjection.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class CountDownLatchInjectionImpl extends Injection implements CountDownLatchInjection {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountDownLatchInjectionImpl.class);
+
+  private ExtendedLatch latch = null;
+
+  @JsonCreator // ensures instances are created only through JSON
+  private CountDownLatchInjectionImpl(@JsonProperty("address") final String address,
+                                      @JsonProperty("port") final int port,
+                                      @JsonProperty("siteClass") final String siteClass,
+                                      @JsonProperty("desc") final String desc) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, 0, 1);
+  }
+
+  @Override
+  protected boolean injectNow() {
+    return true;
+  }
+
+  @Override
+  public void initialize(final int count) {
+    Preconditions.checkArgument(latch == null, "Latch can be initialized only once at %s in %s.", desc,
+      siteClass.getSimpleName());
+    Preconditions.checkArgument(count > 0, "Count has to be a positive integer at %s in %s.", desc,
+      siteClass.getSimpleName());
+    latch = new ExtendedLatch(count);
+  }
+
+  @Override
+  public void await() throws InterruptedException {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    try {
+      latch.await();
+    } catch (final InterruptedException e) {
+      logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
+      throw e;
+    }
+  }
+
+  @Override
+  public void awaitUninterruptibly() {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    latch.awaitUninterruptibly();
+  }
+
+  @Override
+  public void countDown() {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
+    latch.countDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 1171bf8..639802f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -50,13 +50,15 @@ public final class ExecutionControls {
     controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
   }
 
-  // Jackson MixIn for all types of injections
+  // Jackson MixIn: an annotated class that is used only by Jackson's ObjectMapper to allow a list of injections to
+  // hold various types of injections
   @JsonTypeInfo(
     use = JsonTypeInfo.Id.NAME,
     include = JsonTypeInfo.As.PROPERTY,
     property = "type")
   @JsonSubTypes({
     @Type(value = ExceptionInjection.class, name = "exception"),
+    @Type(value = CountDownLatchInjectionImpl.class, name = "latch"),
     @Type(value = PauseInjection.class, name = "pause")})
   public static abstract class InjectionMixIn {
   }
@@ -99,7 +101,7 @@ public final class ExecutionControls {
       final String jsonString = v.string_val;
       try {
         controlsOptionMapper.readValue(jsonString, Controls.class);
-      } catch (IOException e) {
+      } catch (final IOException e) {
         throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
       }
     }
@@ -137,7 +139,7 @@ public final class ExecutionControls {
     final Controls controls;
     try {
       controls = controlsOptionMapper.readValue(opString, Controls.class);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       // This never happens. opString must have been validated.
       logger.warn("Could not parse injections. Injections must have been validated before this point.");
       throw new DrillRuntimeException("Could not parse injections.", e);
@@ -153,7 +155,7 @@ public final class ExecutionControls {
   }
 
   /**
-   * Look for an exception injection matching the given injector, site description and endpoint.
+   * Look for an exception injection matching the given injector, site descriptor, and endpoint.
    *
    * @param injector the injector, which indicates a class
    * @param desc     the injection site description
@@ -165,7 +167,7 @@ public final class ExecutionControls {
   }
 
   /**
-   * Look for an pause injection matching the given injector, site description and endpoint.
+   * Look for an pause injection matching the given injector, site descriptor, and endpoint.
    *
    * @param injector the injector, which indicates a class
    * @param desc     the injection site description
@@ -176,6 +178,20 @@ public final class ExecutionControls {
     return injection != null ? (PauseInjection) injection : null;
   }
 
+  /**
+   * Look for a count down latch injection matching the given injector, site descriptor, and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the count down latch injection, if there is one for the injector, site and endpoint;
+   * otherwise, a latch that does nothing
+   */
+  public CountDownLatchInjection lookupCountDownLatchInjection(final ExecutionControlsInjector injector,
+                                                               final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (CountDownLatchInjection) injection : NoOpControlsInjector.LATCH;
+  }
+
   private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
     if (controls.isEmpty()) {
       return null;
@@ -190,4 +206,15 @@ public final class ExecutionControls {
     // return only if injection was meant for this drillbit
     return injection.isValidForBit(endpoint) ? injection : null;
   }
+
+  /**
+   * This method resumes all pauses within the current context (QueryContext or FragmentContext).
+   */
+  public void unpauseAll() {
+    for (final Injection injection : controls.values()) {
+      if (injection instanceof PauseInjection) {
+        ((PauseInjection) injection).unpause();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 4b1cd0c..05f8433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -75,12 +75,11 @@ public class ExecutionControlsInjector {
    * @param desc              the site description
    *                          throws the exception specified by the injection, if it is time
    */
-  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+  public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
     final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
     if (exceptionInjection != null) {
       exceptionInjection.throwUnchecked();
     }
-    return this;
   }
 
   /**
@@ -95,13 +94,12 @@ public class ExecutionControlsInjector {
    * @param exceptionClass    the expected class of the exception (or a super class of it)
    * @throws T the exception specified by the injection, if it is time
    */
-  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+  public <T extends Throwable> void injectChecked(
     final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
     final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
     if (exceptionInjection != null) {
       exceptionInjection.throwChecked(exceptionClass);
     }
-    return this;
   }
 
   /**
@@ -114,7 +112,7 @@ public class ExecutionControlsInjector {
    * @param desc              the site description
    * @param logger            logger of the class containing the injection site
    */
-  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+  public void injectPause(final ExecutionControls executionControls, final String desc,
                                                final Logger logger) {
     final PauseInjection pauseInjection =
       executionControls.lookupPauseInjection(this, desc);
@@ -124,6 +122,9 @@ public class ExecutionControlsInjector {
       pauseInjection.pause();
       logger.debug("Resuming at {}", desc);
     }
-    return this;
+  }
+
+  public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+    return executionControls.lookupCountDownLatchInjection(this, desc);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index 96fed3a..08ade51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -28,8 +28,8 @@ public abstract class Injection {
 
   protected final String address;  // the address of the drillbit on which to inject
   protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
-  private final Class<?> siteClass; // the class where the injection should happen
-  private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+  protected final Class<?> siteClass; // the class where the injection should happen
+  protected final String desc; // description of the injection site; useful for multiple exception injections in a single class
   private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
   private final AtomicInteger nFire;  // the number of times to do the injection, after any skips; starts > 0
 
@@ -64,7 +64,7 @@ public abstract class Injection {
    *
    * @return if the injection should be injected now
    */
-  protected final boolean injectNow() {
+  protected boolean injectNow() {
     return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 80d9790..33ab783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.testing;
 import org.slf4j.Logger;
 
 /**
- * An injector that does not inject any controls.
+ * An injector that does not inject any controls, useful when not testing (i.e. assertions are not enabled).
  */
 public final class NoOpControlsInjector extends ExecutionControlsInjector {
 
@@ -29,20 +29,42 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
   }
 
   @Override
-  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
-    return this;
+  public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
   }
 
   @Override
-  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+  public <T extends Throwable> void injectChecked(
     final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
-    return this;
   }
 
   @Override
-  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+  public void injectPause(final ExecutionControls executionControls, final String desc,
                                                final Logger logger) {
-    return this;
   }
 
+  /**
+   * When assertions are not enabled, this count down latch that does nothing is injected.
+   */
+  public static final CountDownLatchInjection LATCH = new CountDownLatchInjection() {
+    @Override
+    public void initialize(int count) {
+    }
+
+    @Override
+    public void await() {
+    }
+
+    @Override
+    public void awaitUninterruptibly() {
+    }
+
+    @Override
+    public void countDown() {
+    }
+  };
+
+  @Override
+  public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+    return LATCH;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index e5f9c9c..ff0340b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -21,43 +21,40 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 
 /**
- * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
- * injected pauses; these pauses are specified via
+ * Injection for a single pause. Pause indefinitely until signalled. This class is used internally for tracking
+ * injected pauses. Note that pauses can be fired only once; nFire field is ignored. These pauses are specified via
  * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
  *
- * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ * After the pauses are set, the user sends another signal to unpause all the pauses. This triggers the Foreman to
+ * 1) unpause all pauses in QueryContext, and
+ * 2) send an unpause signal to all fragments, each of which unpauses all pauses in FragmentContext.
  */
 @JsonAutoDetect(fieldVisibility = Visibility.ANY)
 public class PauseInjection extends Injection {
 
-  private final long millis;
+  private final ExtendedLatch latch = new ExtendedLatch(1);
 
   @JsonCreator // ensures instances are created only through JSON
   private PauseInjection(@JsonProperty("address") final String address,
                          @JsonProperty("port") final int port,
                          @JsonProperty("siteClass") final String siteClass,
                          @JsonProperty("desc") final String desc,
-                         @JsonProperty("nSkip") final int nSkip,
-                         @JsonProperty("nFire") final int nFire,
-                         @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
-    super(address, port, siteClass, desc, nSkip, nFire);
-    if (millis <= 0) {
-      throw new InjectionConfigurationException("Pause millis is non-positive.");
-    }
-    this.millis = millis;
+                         @JsonProperty("nSkip") final int nSkip) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, 1);
   }
 
   public void pause() {
-    if (! injectNow()) {
+    if (!injectNow()) {
       return;
     }
-    try {
-      Thread.sleep(millis);
-    } catch (InterruptedException e) {
-      throw new DrillRuntimeException("Well, I should be sleeping.");
-    }
+    latch.awaitUninterruptibly();
+  }
+
+  public void unpause() {
+    latch.countDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 3e4f3d1..f2352e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.QueryManager;
@@ -116,7 +115,7 @@ public class WorkManager implements AutoCloseable {
 
 
     // TODO references to this escape here (via WorkerBee) before construction is done
-    controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
+    controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
     userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
     statusThread = new StatusThread();
     dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
deleted file mode 100644
index b6c6852..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-import static org.apache.drill.exec.rpc.RpcBus.get;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConstants;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.UserRpcException;
-import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-
-public class ControlHandlerImpl implements ControlMessageHandler {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-  private final WorkerBee bee;
-
-  public ControlHandlerImpl(final WorkerBee bee) {
-    this.bee = bee;
-  }
-
-  @Override
-  public Response handle(final ControlConnection connection, final int rpcType,
-      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("Received bit com message of type {}", rpcType);
-    }
-
-    switch (rpcType) {
-
-    case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
-      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
-      cancelFragment(handle);
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
-      final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
-      receivingFragmentFinished(finishedReceiver);
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
-      // TODO: Support a type of message that has no response.
-      return DataRpcConfig.OK;
-
-    case RpcType.REQ_QUERY_CANCEL_VALUE: {
-      final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman != null) {
-        foreman.cancel();
-        return DataRpcConfig.OK;
-      } else {
-        return DataRpcConfig.FAIL;
-      }
-    }
-
-    case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
-      final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
-      for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewRemoteFragment(fragments.getFragment(i));
-      }
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_QUERY_STATUS_VALUE: {
-      final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman == null) {
-        throw new RpcException("Query not running on node.");
-      }
-      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
-      return new Response(RpcType.RESP_QUERY_STATUS, profile);
-    }
-
-    default:
-      throw new RpcException("Not yet supported.");
-    }
-  }
-
-  @Override
-  public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
-    logger.debug("Received remote fragment start instruction", fragment);
-
-    final DrillbitContext drillbitContext = bee.getContext();
-    try {
-      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
-      if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
-            drillbitContext.getFunctionImplementationRegistry());
-        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
-        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
-        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
-            fragment.getFragmentJson());
-        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
-        bee.addFragmentRunner(fr);
-      } else {
-        // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
-        drillbitContext.getWorkBus().addFragmentManager(manager);
-      }
-
-    } catch (final Exception e) {
-        throw new UserRpcException(drillbitContext.getEndpoint(),
-            "Failure while trying to start remote fragment", e);
-    } catch (final OutOfMemoryError t) {
-      if (t.getMessage().startsWith("Direct buffer")) {
-        throw new UserRpcException(drillbitContext.getEndpoint(),
-            "Out of direct memory while trying to start remote fragment", t);
-      } else {
-        throw t;
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
-   */
-  @Override
-  public Ack cancelFragment(final FragmentHandle handle) {
-    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
-    if (manager != null) {
-      // try remote fragment cancel.
-      manager.cancel();
-    } else {
-      // then try local cancel.
-      final FragmentExecutor runner = bee.getFragmentRunner(handle);
-      if (runner != null) {
-        runner.cancel();
-      }
-    }
-
-    return Acks.OK;
-  }
-
-  private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
-
-    final FragmentManager manager =
-        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
-
-    FragmentExecutor executor;
-    if (manager != null) {
-      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
-    } else {
-      // then try local cancel.
-      executor = bee.getFragmentRunner(finishedReceiver.getSender());
-      if (executor != null) {
-        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
-      } else {
-        logger.warn(
-            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
-            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
-            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
-      }
-    }
-
-    return Acks.OK;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index c5d78cc..d12e6d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,24 +17,207 @@
  */
 package org.apache.drill.exec.work.batch;
 
+import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
+import org.apache.drill.exec.rpc.control.ControlRpcConfig;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
-public interface ControlMessageHandler {
+public class ControlMessageHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
+  private final WorkerBee bee;
 
-  public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
-      throws RpcException;
+  public ControlMessageHandler(final WorkerBee bee) {
+    this.bee = bee;
+  }
 
-  public abstract void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException;
+  public Response handle(final ControlConnection connection, final int rpcType,
+      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Received bit com message of type {}", rpcType);
+    }
 
-  public abstract Ack cancelFragment(FragmentHandle handle);
+    switch (rpcType) {
 
+    case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+      cancelFragment(handle);
+      return ControlRpcConfig.OK;
+    }
 
-}
\ No newline at end of file
+    case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+      final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+      receivingFragmentFinished(finishedReceiver);
+      return ControlRpcConfig.OK;
+    }
+
+    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+      bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
+      // TODO: Support a type of message that has no response.
+      return ControlRpcConfig.OK;
+
+    case RpcType.REQ_QUERY_CANCEL_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
+      if (foreman != null) {
+        foreman.cancel();
+        return ControlRpcConfig.OK;
+      } else {
+        return ControlRpcConfig.FAIL;
+      }
+    }
+
+    case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+      final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+      for(int i = 0; i < fragments.getFragmentCount(); i++) {
+        startNewRemoteFragment(fragments.getFragment(i));
+      }
+      return ControlRpcConfig.OK;
+    }
+
+    case RpcType.REQ_QUERY_STATUS_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
+      if (foreman == null) {
+        throw new RpcException("Query not running on node.");
+      }
+      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+      return new Response(RpcType.RESP_QUERY_STATUS, profile);
+    }
+
+    case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+      resumeFragment(handle);
+      return ControlRpcConfig.OK;
+    }
+
+    default:
+      throw new RpcException("Not yet supported.");
+    }
+  }
+
+  private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
+    logger.debug("Received remote fragment start instruction", fragment);
+
+    final DrillbitContext drillbitContext = bee.getContext();
+    try {
+      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+      if (fragment.getLeafFragment()) {
+        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
+            drillbitContext.getFunctionImplementationRegistry());
+        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+            fragment.getFragmentJson());
+        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+        bee.addFragmentRunner(fr);
+      } else {
+        // isIntermediate, store for incoming data.
+        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+        drillbitContext.getWorkBus().addFragmentManager(manager);
+      }
+
+    } catch (final Exception e) {
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Failure while trying to start remote fragment", e);
+    } catch (final OutOfMemoryError t) {
+      if (t.getMessage().startsWith("Direct buffer")) {
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Out of direct memory while trying to start remote fragment", t);
+      } else {
+        throw t;
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+   */
+  private Ack cancelFragment(final FragmentHandle handle) {
+    // cancel a pending fragment
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+    if (manager != null) {
+      manager.cancel();
+      return Acks.OK;
+    }
+
+    // cancel a running fragment
+    final FragmentExecutor runner = bee.getFragmentRunner(handle);
+    if (runner != null) {
+      runner.cancel();
+      return Acks.OK;
+    }
+
+    // fragment completed or does not exist
+    logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    return Acks.OK;
+  }
+
+  private Ack resumeFragment(final FragmentHandle handle) {
+    // resume a pending fragment
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+    if (manager != null) {
+      manager.unpause();
+      return Acks.OK;
+    }
+
+    // resume a paused fragment
+    final FragmentExecutor runner = bee.getFragmentRunner(handle);
+    if (runner != null) {
+      runner.unpause();
+      return Acks.OK;
+    }
+
+    // fragment completed or does not exist
+    logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    return Acks.OK;
+  }
+
+  private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
+    final FragmentManager manager =
+        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+
+    FragmentExecutor executor;
+    if (manager != null) {
+      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
+    } else {
+      executor = bee.getFragmentRunner(finishedReceiver.getSender());
+      if (executor != null) {
+        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      } else {
+        logger.warn(
+            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+      }
+    }
+
+    return Acks.OK;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4d403b8..0122ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -119,6 +119,7 @@ public class Foreman implements Runnable {
   private final DrillbitContext drillbitContext;
   private final UserClientConnection initiatingClient; // used to send responses
   private volatile QueryState state;
+  private boolean resume = false;
 
   private volatile DistributedLease lease; // used to limit the number of concurrent queries
 
@@ -196,6 +197,18 @@ public class Foreman implements Runnable {
   }
 
   /**
+   * Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
+   * This method can be called multiple times.
+   */
+  public void resume() {
+    resume = true;
+    // resume all pauses through query context
+    queryContext.getExecutionControls().unpauseAll();
+    // resume all pauses through all fragment contexts
+    queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+  }
+
+  /**
    * Called by execution pool to do query setup, and kick off remote execution.
    *
    * <p>Note that completion of this function is not the end of the Foreman's role
@@ -268,9 +281,20 @@ public class Foreman implements Runnable {
        * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
        * make sure that we can't make things any worse as those events are delivered, but allow
        * any necessary remaining cleanup to proceed.
+       *
+       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+       * to accept events.
        */
       acceptExternalEvents.countDown();
 
+      // If we received the resume signal before fragments are setup, the first call does not actually resume the
+      // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+      if(resume) {
+        resume();
+      }
+      injector.injectPause(queryContext.getExecutionControls(), "foreman-ready", logger);
+
       // restore the thread's original name
       currentThread.setName(originalName);
     }
@@ -375,7 +399,6 @@ public class Foreman implements Runnable {
     drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
     logger.debug("Submitting fragments to run.");
-    injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
 
     // set up the root fragment first so we'll have incoming buffers available.
     setupRootFragment(rootPlanFragment, work.getRootOperator());

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index c4646bd..090a377 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -189,7 +189,8 @@ public class QueryManager {
           final DrillbitEndpoint endpoint = data.getEndpoint();
           final FragmentHandle handle = data.getHandle();
           // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
-          controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+          controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+            SignalListener.Signal.CANCEL), handle);
         }
         break;
 
@@ -203,25 +204,52 @@ public class QueryManager {
     }
   }
 
+  /**
+   * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
+   * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
+   * control tunnel.
+   */
+  void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+    if (rootRunner != null) {
+      rootRunner.unpause();
+    }
+    final Controller controller = drillbitContext.getController();
+    for(final FragmentData data : fragmentDataSet) {
+      final DrillbitEndpoint endpoint = data.getEndpoint();
+      final FragmentHandle handle = data.getHandle();
+      controller.getTunnel(endpoint).resumeFragment(new SignalListener(endpoint, handle,
+        SignalListener.Signal.UNPAUSE), handle);
+    }
+  }
+
   /*
    * This assumes that the FragmentStatusListener implementation takes action when it hears
-   * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+   * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
    * but log messages.
    */
-  private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
-    public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+  private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
+    /**
+     * An enum of possible signals that {@link SignalListener} listens to.
+     */
+    public static enum Signal { CANCEL, UNPAUSE }
+
+    private final Signal signal;
+
+    public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
       super(endpoint, handle);
+      this.signal = signal;
     }
 
     @Override
     public void failed(final RpcException ex) {
-      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+      logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
     }
 
     @Override
-    public void success(final Ack value, final ByteBuf buf) {
-      if (!value.getOk()) {
-        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+    public void success(final Ack ack, final ByteBuf buf) {
+      if (!ack.getOk()) {
+        logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
+          ack);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7baafc4..24e2556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -108,7 +108,7 @@ public class FragmentExecutor implements Runnable {
 
   /**
    * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
-   * NOTE that this can be called from threads *other* than the one running this runnable(),
+   * NOTE that this will be called from threads *other* than the one running this runnable(),
    * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
@@ -140,11 +140,18 @@ public class FragmentExecutor implements Runnable {
   }
 
   /**
+   * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
+   * the one running this runnable(). Also, this method can be called multiple times.
+   */
+  public synchronized void unpause() {
+    fragmentContext.getExecutionControls().unpauseAll();
+  }
+
+  /**
    * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
    * called in the case that a limit query is executed.
    *
-   * @param handle
-   *          The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+   * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
    */
   public void receivingFragmentFinished(final FragmentHandle handle) {
     acceptExternalEvents.awaitUninterruptibly();
@@ -277,6 +284,8 @@ public class FragmentExecutor implements Runnable {
 
     // first close the operators and release all memory.
     try {
+      // Say executor was cancelled before setup. Now when executor actually runs, root is not initialized, but this
+      // method is called in finally. So root can be null.
       if (root != null) {
         root.close();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 0ba91b4..ad880da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -49,6 +49,12 @@ public interface FragmentManager {
 
   public abstract void cancel();
 
+  /**
+   * If the executor is paused (for testing), this method should unpause the executor. This method should handle
+   * multiple calls.
+   */
+  public abstract void unpause();
+
   public boolean isWaiting();
 
   public abstract FragmentHandle getHandle();

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index f526fbe..ca5d5b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -112,6 +112,11 @@ public class NonRootFragmentManager implements FragmentManager {
   }
 
   @Override
+  public void unpause() {
+    runner.unpause();
+  }
+
+  @Override
   public FragmentHandle getHandle() {
     return fragment.getHandle();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b1c3fe0..67ef9b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -29,8 +29,8 @@ import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 // TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+public class RootFragmentManager implements FragmentManager {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 
   private final IncomingBuffers buffers;
   private final FragmentExecutor runner;
@@ -70,6 +70,11 @@ public class RootFragmentManager implements FragmentManager{
   }
 
   @Override
+  public void unpause() {
+    runner.unpause();
+  }
+
+  @Override
   public boolean isWaiting() {
     return !buffers.isDone() && !cancel;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 8854ef3..e8deb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -68,6 +68,14 @@ public class UserWorker{
     return Acks.OK;
   }
 
+  public Ack resumeQuery(final QueryId queryId) {
+    final Foreman foreman = bee.getForemanForQueryId(queryId);
+    if (foreman != null) {
+      foreman.resume();
+    }
+    return Acks.OK;
+  }
+
   public OptionManager getSystemOptions() {
     return bee.getContext().getOptionManager();
   }