You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/10 22:45:06 UTC

[1/3] drill git commit: DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the

Repository: drill
Updated Branches:
  refs/heads/master 4e596334e -> 3a294abcc


http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index da69e9e..3e4dcb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -25,12 +25,13 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.math3.util.Pair;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -60,7 +61,9 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -68,15 +71,14 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
+ * execution by injecting exceptions at various points and to cancellations in various phases.
+ * The test cases are mentioned in DRILL-2383.
  */
-@Ignore
 public class TestDrillbitResilience {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
 
@@ -90,7 +92,6 @@ public class TestDrillbitResilience {
    * counting sys.drillbits.
    */
   private static final String TEST_QUERY = "select * from sys.memory";
-  private static final long PAUSE_TIME_MILLIS = 3000L;
 
   private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
     if (drillbits.containsKey(name)) {
@@ -184,18 +185,17 @@ public class TestDrillbitResilience {
   }
 
   /**
-   * Clear all exceptions.
+   * Clear all injections.
    */
   private static void clearAllInjections() {
-    assertTrue(drillClient != null);
+    Preconditions.checkNotNull(drillClient);
     ControlsInjectionUtil.clearControls(drillClient);
   }
 
   /**
    * Check that all the drillbits are ok.
    * <p/>
-   * <p>The current implementation does this by counting the number of drillbits using a
-   * query.
+   * <p>The current implementation does this by counting the number of drillbits using a query.
    */
   private static void assertDrillbitsOk() {
       final SingleRowListener listener = new SingleRowListener() {
@@ -245,13 +245,14 @@ public class TestDrillbitResilience {
     try {
       QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
       listener.waitForCompletion();
-      assertTrue(listener.getQueryState() == QueryState.COMPLETED);
+      final QueryState state = listener.getQueryState();
+      assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED);
     } catch (final Exception e) {
       throw new RuntimeException("Couldn't query active drillbits", e);
     }
 
     final List<DrillPBError> errorList = listener.getErrorList();
-    assertTrue(errorList.isEmpty());
+    assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
   }
 
   @SuppressWarnings("static-method")
@@ -262,10 +263,10 @@ public class TestDrillbitResilience {
   }
 
   /**
-   * Set the given exceptions.
+   * Set the given controls.
    */
-  private static void setExceptions(final String controlsString) {
-    ControlsInjectionUtil.setControls(drillClient, controlsString);
+  private static void setControls(final String controls) {
+    ControlsInjectionUtil.setControls(drillClient, controls);
   }
 
   /**
@@ -332,16 +333,16 @@ public class TestDrillbitResilience {
    */
   private static void assertExceptionInjected(final Throwable throwable,
                                               final Class<? extends Throwable> exceptionClass, final String desc) {
-    assertTrue(throwable instanceof UserException);
+    assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
     final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
-    assertEquals(exceptionClass.getName(), cause.getExceptionClass());
-    assertEquals(desc, cause.getMessage());
+    assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
+    assertEquals("Exception sites should match.", desc, cause.getMessage());
   }
 
   @Test
   public void settingNoopInjectionsAndQuery() {
     final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
-    setExceptions(controls);
+    setControls(controls);
     try {
       QueryTestUtil.test(drillClient, TEST_QUERY);
     } catch (final Exception e) {
@@ -357,7 +358,7 @@ public class TestDrillbitResilience {
    */
   private static void testForeman(final String desc) {
     final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
-    setExceptions(controls);
+    setControls(controls);
     try {
       QueryTestUtil.test(drillClient, TEST_QUERY);
       fail();
@@ -372,32 +373,39 @@ public class TestDrillbitResilience {
     testForeman("run-try-beginning");
   }
 
-  /*
-   * TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way
-   * that it gates input via stateListener. That could be tricky, since some results could be
-   * queued up before Foreman has gotten through it's run(), and they would all have to be sent
-   * before the gate is opened. There's also the question of what to do in case we detect failure
-   * there after some data has been sent. Right now, this test doesn't work because that's
-   * exactly what happens, and the client believes that the query succeeded, even though an exception
-   * was thrown after setup completed, but data was asynchronously sent to the client before that.
-   * This test also revealed that the QueryState never seems to make it to the client, so we can't
-   * detect the failure that way (see SingleRowListener's getQueryState(), which I originally tried
-   * to use here to detect query completion).
-   */
   @SuppressWarnings("static-method")
   @Test
   public void foreman_runTryEnd() {
     testForeman("run-try-end");
   }
 
+  /**
+   * Tests can use this listener to wait, until the submitted query completes or fails, by
+   * calling #waitForCompletion.
+   */
   private static class WaitUntilCompleteListener implements UserResultsListener {
-    protected final CountDownLatch latch;
+    private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
     protected QueryId queryId = null;
-    protected Exception ex = null;
-    protected QueryState state = null;
+    protected volatile Pointer<Exception> ex = new Pointer<>();
+    protected volatile QueryState state = null;
+
+    /**
+     * Method that sets the exception if the condition is not met.
+     */
+    protected final void check(final boolean condition, final String format, final Object... args) {
+      if (!condition) {
+        ex.value = new IllegalStateException(String.format(format, args));
+      }
+    }
 
-    public WaitUntilCompleteListener(final int count) {
-      latch = new CountDownLatch(count);
+    /**
+     * Method that cancels and resumes the query, in order.
+     */
+    protected final void cancelAndResume() {
+      Preconditions.checkNotNull(queryId);
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      (new CancellingThread(queryId, ex, trigger)).start();
+      (new ResumingThread(queryId, ex, trigger)).start();
     }
 
     @Override
@@ -407,7 +415,7 @@ public class TestDrillbitResilience {
 
     @Override
     public void submissionFailed(final UserException ex) {
-      this.ex = ex;
+      this.ex.value = ex;
       state = QueryState.FAILED;
       latch.countDown();
     }
@@ -424,21 +432,23 @@ public class TestDrillbitResilience {
     }
 
     public final Pair<QueryState, Exception> waitForCompletion() {
-      try {
-        latch.await();
-      } catch (final InterruptedException e) {
-        return new Pair<QueryState, Exception>(state, e);
-      }
-      return new Pair<>(state, ex);
+      latch.awaitUninterruptibly();
+      return new Pair<>(state, ex.value);
     }
   }
 
+  /**
+   * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
+   */
   private static class CancellingThread extends Thread {
-
     private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
 
-    public CancellingThread(final QueryId queryId) {
+    public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
       this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
     }
 
     @Override
@@ -446,139 +456,178 @@ public class TestDrillbitResilience {
       final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
       try {
         cancelAck.checkedGet();
-      } catch (final RpcException e) {
-        fail(e.getMessage()); // currently this failure does not fail the test
+      } catch (final RpcException ex) {
+        this.ex.value = ex;
       }
+      latch.countDown();
+    }
+  }
+
+  /**
+   * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
+   * the thread waits without interruption.
+   */
+  private static class ResumingThread extends Thread {
+    private final QueryId queryId;
+    private final Pointer<Exception> ex;
+    private final ExtendedLatch latch;
+
+    public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
+      this.queryId = queryId;
+      this.ex = ex;
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId);
+      try {
+        resumeAck.checkedGet();
+      } catch (final RpcException ex) {
+        this.ex.value = ex;
+      }
+    }
+  }
+
+  /**
+   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the state is not
+   * as expected or if an exception is thrown.
+   */
+  private static void assertCompleteState(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+    final QueryState actualState = result.getFirst();
+    final Exception exception = result.getSecond();
+    if (actualState != expectedState || exception != null) {
+      fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
+        expectedState, actualState, exception == null ? "none." : exception));
     }
   }
 
   /**
    * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
    */
-  private static void assertCancelled(final String controls, final WaitUntilCompleteListener listener) {
-    ControlsInjectionUtil.setControls(drillClient, controls);
+  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    setControls(controls);
 
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(String.format("Expected Query Outcome of CANCELED but had Outcome of %s", result.getFirst()),
-        result.getFirst() == QueryState.CANCELED);
-    assertTrue(String.format("Expected no Exception but had Exception %s", result.getSecond()),
-        result.getSecond() == null);
+    assertCompleteState(result, QueryState.CANCELED);
   }
 
-  @Test // Cancellation TC 1
-  public void cancelBeforeAnyResultsArrive() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+  private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
+    return "{\"injections\" : [{"
+      + "\"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + siteDesc + "\","
+      + "\"nSkip\" : " + nSkip
+      + "}]}";
+  }
+
+  private static String createPauseInjection(final Class siteClass, final String siteDesc) {
+    return createPauseInjection(siteClass, siteDesc, 0);
+  }
 
+  @Test // To test pause and resume. Test hangs if resume did not happen.
+  public void passThrough() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       @Override
       public void queryIdArrived(final QueryId queryId) {
-        (new CancellingThread(queryId)).start();
+        super.queryIdArrived(queryId);
+        final ExtendedLatch trigger = new ExtendedLatch(1);
+        (new ResumingThread(queryId, ex, trigger)).start();
+        trigger.countDown();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + Foreman.class.getName() + "\","
-      + "\"desc\":\"pause-run-plan\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
+    final String controls = createPauseInjection(PojoRecordReader.class, "read-next");
+    setControls(controls);
+
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertCompleteState(result, QueryState.COMPLETED);
+  }
+
+  @Test // Cancellation TC 1: cancel before any result set is returned
+  public void cancelBeforeAnyResultsArrive() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
 
-    assertCancelled(controls, listener);
+      @Override
+      public void queryIdArrived(final QueryId queryId) {
+        super.queryIdArrived(queryId);
+        cancelAndResume();
+      }
+    };
+
+    final String controls = createPauseInjection(Foreman.class, "foreman-ready");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Cancellation TC 2
+  @Test // Cancellation TC 2: cancel in the middle of fetching result set
   public void cancelInMiddleOfFetchingResults() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private boolean cancelRequested = false;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-        if (! cancelRequested) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+        if (!cancelRequested) {
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
           cancelRequested = true;
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
-      + "\"desc\":\"sending-data\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
+    final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+    assertCancelledWithoutException(controls, listener);
   }
 
 
-  @Test // Cancellation TC 3
+  @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
   public void cancelAfterAllResultsProduced() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
         if (++count == drillbits.size()) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
-      + "\"desc\":\"send-complete\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Cancellation TC 4
+  @Test // Cancellation TC 4: cancel after everything is completed and fetched
   public void cancelAfterEverythingIsCompleted() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       private int count = 0;
 
       @Override
-      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
         if (++count == drillbits.size()) {
-          assertTrue(queryId != null);
-          (new CancellingThread(queryId)).start();
+          check(queryId != null, "Query id should not be null, since we have waited long enough.");
+          cancelAndResume();
         }
         result.release();
       }
     };
 
-    final String controls = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"" + Foreman.class.getName() + "\","
-      + "\"desc\":\"foreman-cleanup\","
-      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
-      + "}]}";
-
-    assertCancelled(controls, listener);
+    final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
+    assertCancelledWithoutException(controls, listener);
   }
 
-  @Test // Completion TC 1
+  @Test // Completion TC 1: success
   public void successfullyCompletes() {
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
-    QueryTestUtil.testWithListener(
-      drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(result.getFirst() == QueryState.COMPLETED);
-    assertTrue(result.getSecond() == null);
+    assertCompleteState(result, QueryState.COMPLETED);
   }
 
   /**
@@ -586,16 +635,16 @@ public class TestDrillbitResilience {
    */
   private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
                                                final String exceptionDesc) {
-    setExceptions(controls);
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL,  TEST_QUERY, listener);
+    setControls(controls);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(result.getFirst() == QueryState.FAILED);
-    final Exception e = result.getSecond();
-    assertExceptionInjected(e, exceptionClass, exceptionDesc);
+    final QueryState state = result.getFirst();
+    assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
+    assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 2
+  @Test // Completion TC 2: failed query - before query is executed - while sql parsing
   public void failsWhenParsing() {
     final String exceptionDesc = "sql-parsing";
     final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
@@ -603,7 +652,7 @@ public class TestDrillbitResilience {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 3
+  @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
   public void failsWhenSendingFragments() {
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
@@ -611,7 +660,7 @@ public class TestDrillbitResilience {
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
 
-  @Test // Completion TC 4
+  @Test // Completion TC 4: failed query - during query execution
   public void failsDuringExecution() {
     final String exceptionDesc = "fragment-execution";
     final Class<? extends Throwable> exceptionClass = IOException.class;

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
new file mode 100644
index 0000000..c98f54c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.util.Pointer;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCountDownLatchInjection extends BaseTestQuery {
+
+  private static final UserSession session = UserSession.Builder.newBuilder()
+    .withCredentials(UserCredentials.newBuilder()
+      .setUserName("foo")
+      .build())
+    .withUserProperties(UserProperties.getDefaultInstance())
+    .withOptionManager(bits[0].getContext().getOptionManager())
+    .build();
+
+  /**
+   * Class whose methods we want to simulate count down latches at run-time for testing
+   * purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
+   * {@link org.apache.drill.exec.ops.FragmentContext}.
+   */
+  private static class DummyClass {
+    private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+
+    private final QueryContext context;
+    private final CountDownLatch latch;
+    private final int count;
+
+    public DummyClass(final QueryContext context, final CountDownLatch latch, final int count) {
+      this.context = context;
+      this.latch = latch;
+      this.count = count;
+    }
+
+    public static final String LATCH_NAME = "<<latch>>";
+
+    /**
+     * Method that initializes and waits for "count" number of count down (from those many threads)
+     */
+    public long initAndWait() throws InterruptedException {
+      // ... code ...
+
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).initialize(count);
+
+      // ... code ...
+      latch.countDown(); // trigger threads spawn
+
+      final long startTime = System.currentTimeMillis();
+      // simulated wait for "count" threads to count down on the same latch
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).await();
+      final long endTime = System.currentTimeMillis();
+      // ... code ...
+      return (endTime - startTime);
+    }
+
+    public void countDown() {
+      // ... code ...
+      injector.getLatch(context.getExecutionControls(), LATCH_NAME).countDown();
+      // ... code ...
+    }
+  }
+
+  private static class ThreadCreator extends Thread {
+
+    private final DummyClass dummyClass;
+    private final ExtendedLatch latch;
+    private final int count;
+    private final Pointer<Long> countingDownTime;
+
+    public ThreadCreator(final DummyClass dummyClass, final ExtendedLatch latch, final int count,
+                         final Pointer<Long> countingDownTime) {
+      this.dummyClass = dummyClass;
+      this.latch = latch;
+      this.count = count;
+      this.countingDownTime = countingDownTime;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      final long startTime = System.currentTimeMillis();
+      for (int i = 0; i < count; i++) {
+        (new Thread() {
+          @Override
+          public void run() {
+            dummyClass.countDown();
+          }
+        }).start();
+      }
+      final long endTime = System.currentTimeMillis();
+      countingDownTime.value = (endTime - startTime);
+    }
+  }
+
+  @Test // test would hang if the correct init, wait and countdowns did not happen, and the test timeout mechanism will
+  // catch that case
+  public void latchInjected() {
+    final int threads = 10;
+    final ExtendedLatch trigger = new ExtendedLatch(1);
+    final Pointer<Long> countingDownTime = new Pointer<>();
+
+    final String jsonString = "{\"injections\":[{"
+      + "\"type\":\"latch\"," +
+      "\"siteClass\":\"org.apache.drill.exec.testing.TestCountDownLatchInjection$DummyClass\","
+      + "\"desc\":\"" + DummyClass.LATCH_NAME + "\""
+      + "}]}";
+
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+
+    final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
+    (new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();
+    final long timeSpentWaiting;
+    try {
+      timeSpentWaiting = dummyClass.initAndWait();
+    } catch (final InterruptedException e) {
+      fail("Thread should not be interrupted; there is no deliberate attempt.");
+      return;
+    }
+    assertTrue(timeSpentWaiting >= countingDownTime.value);
+    try {
+      queryContext.close();
+    } catch (final Exception e) {
+      fail("Failed to close query context: " + e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 5fa2b3f..ba29c58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -18,20 +18,33 @@
 package org.apache.drill.exec.testing;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.Pointer;
 import org.junit.Test;
 import org.slf4j.Logger;
 
+import java.util.concurrent.CountDownLatch;
+
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestPauseInjection extends BaseTestQuery {
 
   private static final UserSession session = UserSession.Builder.newBuilder()
-      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+      .withCredentials(UserCredentials.newBuilder()
+        .setUserName("foo")
+        .build())
       .withUserProperties(UserProperties.getDefaultInstance())
       .withOptionManager(bits[0].getContext().getOptionManager())
       .build();
@@ -46,9 +59,11 @@ public class TestPauseInjection extends BaseTestQuery {
     private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
 
     private final QueryContext context;
+    private final CountDownLatch latch;
 
-    public DummyClass(final QueryContext context) {
+    public DummyClass(final QueryContext context, final CountDownLatch latch) {
       this.context = context;
+      this.latch = latch;
     }
 
     public static final String PAUSES = "<<pauses>>";
@@ -61,6 +76,7 @@ public class TestPauseInjection extends BaseTestQuery {
     public long pauses() {
       // ... code ...
 
+      latch.countDown();
       final long startTime = System.currentTimeMillis();
       // simulated pause
       injector.injectPause(context.getExecutionControls(), PAUSES, logger);
@@ -71,30 +87,136 @@ public class TestPauseInjection extends BaseTestQuery {
     }
   }
 
+  private static class ResumingThread extends Thread {
+
+    private final QueryContext context;
+    private final ExtendedLatch latch;
+    private final Pointer<Exception> ex;
+    private final long millis;
+
+    public ResumingThread(final QueryContext context, final ExtendedLatch latch, final Pointer<Exception> ex,
+                          final long millis) {
+      this.context = context;
+      this.latch = latch;
+      this.ex = ex;
+      this.millis = millis;
+    }
+
+    @Override
+    public void run() {
+      latch.awaitUninterruptibly();
+      try {
+        Thread.sleep(millis);
+      } catch (final InterruptedException ex) {
+        this.ex.value = ex;
+      }
+      context.getExecutionControls().unpauseAll();
+    }
+  }
+
   @Test
   public void pauseInjected() {
-    final long pauseMillis = 1000L;
+    final long expectedDuration = 1000L;
+    final ExtendedLatch trigger = new ExtendedLatch(1);
+    final Pointer<Exception> ex = new Pointer<>();
+
     final String jsonString = "{\"injections\":[{"
       + "\"type\":\"pause\"," +
       "\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
       + "\"desc\":\"" + DummyClass.PAUSES + "\","
-      + "\"millis\":" + pauseMillis + ","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1"
+      + "\"nSkip\":0"
       + "}]}";
 
     ControlsInjectionUtil.setControls(session, jsonString);
 
     final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
 
+    (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
     // test that the pause happens
-    final DummyClass dummyClass = new DummyClass(queryContext);
-    final long time = dummyClass.pauses();
-    assertTrue((time >= pauseMillis));
+    final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+    final long actualDuration = dummyClass.pauses();
+    assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+      expectedDuration <= actualDuration);
+    assertTrue("No exception should be thrown.", ex.value == null);
     try {
       queryContext.close();
-    } catch (Exception e) {
-      fail();
+    } catch (final Exception e) {
+      fail("Failed to close query context: " + e);
+    }
+  }
+
+  @Test
+  public void pauseOnSpecificBit() {
+    final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+    final ZookeeperHelper zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
+
+    // Creating two drillbits
+    final Drillbit drillbit1, drillbit2;
+    final DrillConfig drillConfig = zkHelper.getConfig();
+    try {
+      drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+      drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+    } catch (final DrillbitStartupException e) {
+      throw new RuntimeException("Failed to start two drillbits.", e);
+    }
+
+    final DrillbitContext drillbitContext1 = drillbit1.getContext();
+    final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+    final UserSession session = UserSession.Builder.newBuilder()
+      .withCredentials(UserCredentials.newBuilder()
+        .setUserName("foo")
+        .build())
+      .withUserProperties(UserProperties.getDefaultInstance())
+      .withOptionManager(drillbitContext1.getOptionManager())
+      .build();
+
+    final DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
+    final String jsonString = "{\"injections\":[{"
+      + "\"type\" : \"pause\"," +
+      "\"siteClass\" : \"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+      + "\"desc\" : \"" + DummyClass.PAUSES + "\","
+      + "\"nSkip\" : 0, "
+      + "\"address\" : \"" + drillbitEndpoint1.getAddress() + "\","
+      + "\"port\" : " + drillbitEndpoint1.getUserPort()
+      + "}]}";
+
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    {
+      final long expectedDuration = 1000L;
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      final Pointer<Exception> ex = new Pointer<>();
+      final QueryContext queryContext = new QueryContext(session, drillbitContext1);
+      (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
+      // test that the pause happens
+      final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+      final long actualDuration = dummyClass.pauses();
+      assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+        expectedDuration <= actualDuration);
+      assertTrue("No exception should be thrown.", ex.value == null);
+      try {
+        queryContext.close();
+      } catch (final Exception e) {
+        fail("Failed to close query context: " + e);
+      }
+    }
+
+    {
+      final ExtendedLatch trigger = new ExtendedLatch(1);
+      final QueryContext queryContext = new QueryContext(session, drillbitContext2);
+
+      // if the resume did not happen, the test would hang
+      final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+      dummyClass.pauses();
+      try {
+        queryContext.close();
+      } catch (final Exception e) {
+        fail("Failed to close query context: " + e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index 470e976..b428337 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -47,13 +47,13 @@ public final class BitControl {
      */
     GOODBYE(2, 2),
     /**
-     * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+     * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
      *
      * <pre>
      * bit requests
      * </pre>
      */
-    REQ_INIATILIZE_FRAGMENTS(3, 3),
+    REQ_INITIALIZE_FRAGMENTS(3, 3),
     /**
      * <code>REQ_CANCEL_FRAGMENT = 6;</code>
      *
@@ -91,25 +91,33 @@ public final class BitControl {
      */
     REQ_QUERY_CANCEL(9, 15),
     /**
+     * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+     *
+     * <pre>
+     * send a resume message for a fragment, returns Ack
+     * </pre>
+     */
+    REQ_UNPAUSE_FRAGMENT(10, 16),
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
      * bit responses
      * </pre>
      */
-    RESP_FRAGMENT_HANDLE(10, 11),
+    RESP_FRAGMENT_HANDLE(11, 11),
     /**
      * <code>RESP_FRAGMENT_STATUS = 12;</code>
      */
-    RESP_FRAGMENT_STATUS(11, 12),
+    RESP_FRAGMENT_STATUS(12, 12),
     /**
      * <code>RESP_BIT_STATUS = 13;</code>
      */
-    RESP_BIT_STATUS(12, 13),
+    RESP_BIT_STATUS(13, 13),
     /**
      * <code>RESP_QUERY_STATUS = 14;</code>
      */
-    RESP_QUERY_STATUS(13, 14),
+    RESP_QUERY_STATUS(14, 14),
     ;
 
     /**
@@ -125,13 +133,13 @@ public final class BitControl {
      */
     public static final int GOODBYE_VALUE = 2;
     /**
-     * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+     * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
      *
      * <pre>
      * bit requests
      * </pre>
      */
-    public static final int REQ_INIATILIZE_FRAGMENTS_VALUE = 3;
+    public static final int REQ_INITIALIZE_FRAGMENTS_VALUE = 3;
     /**
      * <code>REQ_CANCEL_FRAGMENT = 6;</code>
      *
@@ -169,6 +177,14 @@ public final class BitControl {
      */
     public static final int REQ_QUERY_CANCEL_VALUE = 15;
     /**
+     * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+     *
+     * <pre>
+     * send a resume message for a fragment, returns Ack
+     * </pre>
+     */
+    public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16;
+    /**
      * <code>RESP_FRAGMENT_HANDLE = 11;</code>
      *
      * <pre>
@@ -197,13 +213,14 @@ public final class BitControl {
         case 0: return HANDSHAKE;
         case 1: return ACK;
         case 2: return GOODBYE;
-        case 3: return REQ_INIATILIZE_FRAGMENTS;
+        case 3: return REQ_INITIALIZE_FRAGMENTS;
         case 6: return REQ_CANCEL_FRAGMENT;
         case 7: return REQ_RECEIVER_FINISHED;
         case 8: return REQ_FRAGMENT_STATUS;
         case 9: return REQ_BIT_STATUS;
         case 10: return REQ_QUERY_STATUS;
         case 15: return REQ_QUERY_CANCEL;
+        case 16: return REQ_UNPAUSE_FRAGMENT;
         case 11: return RESP_FRAGMENT_HANDLE;
         case 12: return RESP_FRAGMENT_STATUS;
         case 13: return RESP_BIT_STATUS;
@@ -7395,16 +7412,17 @@ public final class BitControl {
       "oint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_tim" +
       "e\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030",
       "\001 \001(\0132\030.exec.bit.FragmentHandle\022(\n\006sende" +
-      "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\271\002\n\007Rp" +
+      "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\323\002\n\007Rp" +
       "cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE" +
-      "\020\002\022\034\n\030REQ_INIATILIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
+      "\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
       "CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISH" +
       "ED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT" +
       "_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_Q" +
-      "UERY_CANCEL\020\017\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022" +
-      "\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_ST" +
-      "ATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016B+\n\033org.apa",
-      "che.drill.exec.protoB\nBitControlH\001"
+      "UERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022" +
+      "\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP_FRAGME" +
+      "NT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP",
+      "_QUERY_STATUS\020\016B+\n\033org.apache.drill.exec" +
+      ".protoB\nBitControlH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index c3ff58b..afe8bfe 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -67,25 +67,33 @@ public final class UserProtos {
      */
     REQUEST_RESULTS(5, 5),
     /**
+     * <code>RESUME_PAUSED_QUERY = 11;</code>
+     *
+     * <pre>
+     * user is sending a query resume request to the drillbit
+     * </pre>
+     */
+    RESUME_PAUSED_QUERY(6, 11),
+    /**
      * <code>QUERY_DATA = 6;</code>
      *
      * <pre>
      * bit to user
      * </pre>
      */
-    QUERY_DATA(6, 6),
+    QUERY_DATA(7, 6),
     /**
      * <code>QUERY_HANDLE = 7;</code>
      */
-    QUERY_HANDLE(7, 7),
+    QUERY_HANDLE(8, 7),
     /**
      * <code>REQ_META_FUNCTIONS = 8;</code>
      */
-    REQ_META_FUNCTIONS(8, 8),
+    REQ_META_FUNCTIONS(9, 8),
     /**
      * <code>RESP_FUNCTION_LIST = 9;</code>
      */
-    RESP_FUNCTION_LIST(9, 9),
+    RESP_FUNCTION_LIST(10, 9),
     /**
      * <code>QUERY_RESULT = 10;</code>
      *
@@ -93,7 +101,7 @@ public final class UserProtos {
      * drillbit is reporting a query status change, most likely a terminal message, to the user
      * </pre>
      */
-    QUERY_RESULT(10, 10),
+    QUERY_RESULT(11, 10),
     ;
 
     /**
@@ -129,6 +137,14 @@ public final class UserProtos {
      */
     public static final int REQUEST_RESULTS_VALUE = 5;
     /**
+     * <code>RESUME_PAUSED_QUERY = 11;</code>
+     *
+     * <pre>
+     * user is sending a query resume request to the drillbit
+     * </pre>
+     */
+    public static final int RESUME_PAUSED_QUERY_VALUE = 11;
+    /**
      * <code>QUERY_DATA = 6;</code>
      *
      * <pre>
@@ -168,6 +184,7 @@ public final class UserProtos {
         case 3: return RUN_QUERY;
         case 4: return CANCEL_QUERY;
         case 5: return REQUEST_RESULTS;
+        case 11: return RESUME_PAUSED_QUERY;
         case 6: return QUERY_DATA;
         case 7: return QUERY_HANDLE;
         case 8: return REQ_META_FUNCTIONS;
@@ -4986,16 +5003,17 @@ public final class UserProtos {
       "n\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n\013rpc_ver" +
       "sion\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exec.user.H" +
       "andshakeStatus\022\017\n\007errorId\030\004 \001(\t\022\024\n\014error" +
-      "Message\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
+      "Message\030\005 \001(\t*\341\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
       "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY\020\003\022\020\n" +
-      "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\016\n\n" +
-      "QUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022REQ_ME" +
-      "TA_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIST\020\t\022\020" +
-      "\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMode\022\017\n\013" +
-      "STREAM_FULL\020\001*^\n\017HandshakeStatus\022\013\n\007SUCC" +
-      "ESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022\017\n\013AUTH_" +
-      "FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n\033org.apa" +
-      "che.drill.exec.protoB\nUserProtosH\001"
+      "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\027\n\023" +
+      "RESUME_PAUSED_QUERY\020\013\022\016\n\nQUERY_DATA\020\006\022\020\n" +
+      "\014QUERY_HANDLE\020\007\022\026\n\022REQ_META_FUNCTIONS\020\010\022" +
+      "\026\n\022RESP_FUNCTION_LIST\020\t\022\020\n\014QUERY_RESULT\020" +
+      "\n*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL\020\001*^" +
+      "\n\017HandshakeStatus\022\013\n\007SUCCESS\020\001\022\030\n\024RPC_VE" +
+      "RSION_MISMATCH\020\002\022\017\n\013AUTH_FAILED\020\003\022\023\n\017UNK" +
+      "NOWN_FAILURE\020\004B+\n\033org.apache.drill.exec." +
+      "protoB\nUserProtosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
index 4d03073..6687a86 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
@@ -28,6 +28,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
     RUN_QUERY(3),
     CANCEL_QUERY(4),
     REQUEST_RESULTS(5),
+    RESUME_PAUSED_QUERY(11),
     QUERY_DATA(6),
     QUERY_HANDLE(7),
     REQ_META_FUNCTIONS(8),
@@ -61,6 +62,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
             case 8: return REQ_META_FUNCTIONS;
             case 9: return RESP_FUNCTION_LIST;
             case 10: return QUERY_RESULT;
+            case 11: return RESUME_PAUSED_QUERY;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 93bc33c..c9295f0 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -14,25 +14,25 @@ enum RpcType {
   HANDSHAKE = 0;
   ACK = 1;
   GOODBYE = 2;
-    
+
   // bit requests
-  REQ_INIATILIZE_FRAGMENTS = 3; // Returns Handle
-    
+  REQ_INITIALIZE_FRAGMENTS = 3; // Returns Handle
+
   REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
   REQ_RECEIVER_FINISHED = 7;
   REQ_FRAGMENT_STATUS = 8; // send a fragment status, return Ack
   REQ_BIT_STATUS = 9; // get bit status.
   REQ_QUERY_STATUS = 10;
   REQ_QUERY_CANCEL = 15;
-      
-    // bit responses
+  REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+
+  // bit responses
   RESP_FRAGMENT_HANDLE = 11;
   RESP_FRAGMENT_STATUS = 12;
   RESP_BIT_STATUS = 13;
   RESP_QUERY_STATUS = 14;
 }
 
-
 message BitControlHandshake{
   optional int32 rpc_version = 1;
   optional exec.shared.RpcChannel channel = 2 [default = BIT_CONTROL];

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto
index 185a646..ceed3d8 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -17,6 +17,7 @@ enum RpcType {
   RUN_QUERY = 3; // user is submitting a new query to the drillbit
   CANCEL_QUERY = 4; // user is sending a query cancellation request to the drillbit
   REQUEST_RESULTS = 5;
+  RESUME_PAUSED_QUERY = 11; // user is sending a query resume request to the drillbit
 
   // bit to user
   QUERY_DATA = 6; // drillbit is sending a query result data batch to the user


[2/3] drill git commit: DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the

Posted by ve...@apache.org.
DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the Control layer.

+ Better error messages and more tests in TestDrillbitResilience and TestPauseInjection
+ Added execution controls to operator context
+ Removed ControlMessageHandler interface, renamed ControlHandlerImpl to ControlMessageHandler
+ Added CountDownLatchInjection, useful in cases like ParititionedSender that spawns multiple threads


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f8e5e613
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f8e5e613
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f8e5e613

Branch: refs/heads/master
Commit: f8e5e613ff001da79377caea9c8e453e5619499a
Parents: 4e59633
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Thu Apr 30 13:27:08 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun May 10 12:24:44 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |  10 +-
 .../apache/drill/exec/ops/OperatorContext.java  |   3 +
 .../drill/exec/ops/OperatorContextImpl.java     |   8 +
 .../drill/exec/physical/impl/ScreenCreator.java |   6 +-
 .../exec/rpc/control/ControlRpcConfig.java      |   4 +-
 .../drill/exec/rpc/control/ControlTunnel.java   |  20 +-
 .../drill/exec/rpc/user/UserRpcConfig.java      |   3 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |   9 +
 .../drill/exec/store/pojo/PojoRecordReader.java |  16 +-
 .../exec/testing/CountDownLatchInjection.java   |  51 ++++
 .../testing/CountDownLatchInjectionImpl.java    |  85 ++++++
 .../drill/exec/testing/ExecutionControls.java   |  37 ++-
 .../exec/testing/ExecutionControlsInjector.java |  13 +-
 .../apache/drill/exec/testing/Injection.java    |   6 +-
 .../exec/testing/NoOpControlsInjector.java      |  36 ++-
 .../drill/exec/testing/PauseInjection.java      |  33 +-
 .../org/apache/drill/exec/work/WorkManager.java |   3 +-
 .../exec/work/batch/ControlHandlerImpl.java     | 197 ------------
 .../exec/work/batch/ControlMessageHandler.java  | 195 +++++++++++-
 .../apache/drill/exec/work/foreman/Foreman.java |  25 +-
 .../drill/exec/work/foreman/QueryManager.java   |  44 ++-
 .../exec/work/fragment/FragmentExecutor.java    |  15 +-
 .../exec/work/fragment/FragmentManager.java     |   6 +
 .../work/fragment/NonRootFragmentManager.java   |   5 +
 .../exec/work/fragment/RootFragmentManager.java |   9 +-
 .../apache/drill/exec/work/user/UserWorker.java |   8 +
 .../exec/server/TestDrillbitResilience.java     | 303 +++++++++++--------
 .../testing/TestCountDownLatchInjection.java    | 155 ++++++++++
 .../drill/exec/testing/TestPauseInjection.java  | 146 ++++++++-
 .../org/apache/drill/exec/proto/BitControl.java |  48 ++-
 .../org/apache/drill/exec/proto/UserProtos.java |  46 ++-
 .../apache/drill/exec/proto/beans/RpcType.java  |   2 +
 protocol/src/main/protobuf/BitControl.proto     |  12 +-
 protocol/src/main/protobuf/User.proto           |   1 +
 34 files changed, 1106 insertions(+), 454 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 136d8c7..c642c4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -312,10 +312,18 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
-    logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+    if(logger.isDebugEnabled()) {
+      logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+    }
     return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
   }
 
+  public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
+    if(logger.isDebugEnabled()) {
+      logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
+    }
+    return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
+  }
 
   /**
    * Submits a Logical plan for direct execution (bypasses parsing)

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 7cc52ba..35139d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 public abstract class OperatorContext {
 
@@ -36,6 +37,8 @@ public abstract class OperatorContext {
 
   public abstract OperatorStats getStats();
 
+  public abstract ExecutionControls getExecutionControls();
+
   public static int getChildCount(PhysicalOperator popConfig) {
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 6dbd880..9fa8867 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -24,11 +24,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
 
   private final BufferAllocator allocator;
+  private final ExecutionControls executionControls;
   private boolean closed = false;
   private PhysicalOperator popConfig;
   private OperatorStats stats;
@@ -42,6 +44,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
 
     OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
     this.stats = context.getStats().getOperatorStats(def, allocator);
+    executionControls = context.getExecutionControls();
   }
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
@@ -49,6 +52,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
     this.popConfig = popConfig;
     this.stats     = stats;
+    executionControls = context.getExecutionControls();
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {
@@ -70,6 +74,10 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return newBuf;
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   public BufferAllocator getAllocator() {
     if (allocator == null) {
       throw new UnsupportedOperationException("Operator context does not have an allocator");

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c31de66..76dc91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -52,7 +52,6 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
   static class ScreenRoot extends BaseRootExec {
-//    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     private final RecordBatch incoming;
     private final FragmentContext context;
     private final AccountingUserConnection userConnection;
@@ -136,6 +135,11 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
 
+    @Override
+    public void close() throws Exception {
+      injector.injectPause(context.getExecutionControls(), "send-complete", logger);
+      super.close();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index f92bb49..0cfa56e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -40,16 +40,18 @@ public class ControlRpcConfig {
         .name("CONTROL")
         .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
         .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
-        .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+        .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
         .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+        .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
         .build();
   }
 
   public static int RPC_VERSION = 3;
 
   public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+  public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index a4f9fdf..16b9b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,12 +17,9 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import java.util.Collection;
-
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -56,7 +53,12 @@ public class ControlTunnel {
   }
 
   public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
-    CancelFragment b = new CancelFragment(outcomeListener, handle);
+    final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_CANCEL_FRAGMENT);
+    manager.runCommand(b);
+  }
+
+  public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
+    final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
     manager.runCommand(b);
   }
 
@@ -114,17 +116,19 @@ public class ControlTunnel {
     }
   }
 
-  public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
+  public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
     final FragmentHandle handle;
+    final RpcType type;
 
-    public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+    public SignalFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle, RpcType type) {
       super(listener);
       this.handle = handle;
+      this.type = type;
     }
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
-      connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+      connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
     }
 
   }
@@ -139,7 +143,7 @@ public class ControlTunnel {
 
     @Override
     public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
-      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class);
+      connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index ae728d8..3f8122d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -36,11 +36,12 @@ public class UserRpcConfig {
     return RpcConfig.newBuilder()
         .name("USER")
         .timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
-        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+        .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
         .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
         .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
         .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
         .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+        .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index b3b7ae9..72b07ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -113,6 +113,15 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
         throw new RpcException("Failure while decoding QueryId body.", e);
       }
 
+    case RpcType.RESUME_PAUSED_QUERY_VALUE:
+      try {
+        final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final Ack ack = worker.resumeQuery(queryId);
+        return new Response(RpcType.ACK, ack);
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RpcException("Failure while decoding QueryId body.", e);
+      }
+
     default:
       throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type.  Type was %d.", rpcType));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index cf98b83..a893da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -42,13 +42,16 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
 import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
 import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 
 public class PojoRecordReader<T> extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+  private static final ExecutionControlsInjector injector =
+    ExecutionControlsInjector.getInjector(PojoRecordReader.class);
 
   public final int forJsonIgnore = 1;
 
@@ -64,16 +67,9 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
     this.iterator = iterator;
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    operatorContext = context;
     try {
       Field[] fields = pojoClass.getDeclaredFields();
       List<PojoWriter> writers = Lists.newArrayList();
@@ -147,7 +143,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   @Override
   public int next() {
     boolean allocated = false;
-
+    injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
     try {
       int i =0;
       outside:

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
new file mode 100644
index 0000000..de4a181
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * This class is used internally for tracking injected countdown latches. These latches are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * This injection is useful in the case where a thread spawns multiple threads. The parent thread initializes the latch
+ * with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
+ * and same descriptor), and once there are enough, the parent thread continues.
+ */
+public interface CountDownLatchInjection {
+
+  /**
+   * Initializes the underlying latch
+   * @param count the number of times {@link #countDown} must be invoke before threads can pass through {@link #await}
+   */
+  void initialize(final int count);
+
+  /**
+   * Causes the current thread to wait until the latch has counted down to zero, unless the thread is
+   * {@link Thread#interrupt interrupted}.
+   */
+  void await() throws InterruptedException;
+
+  /**
+   * Await without interruption. In the case of interruption, log a warning and continue to wait.
+   */
+  void awaitUninterruptibly();
+
+  /**
+   * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
+   */
+  void countDown();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
new file mode 100644
index 0000000..f4012c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
+ * {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
+ * provides more control than PauseInjection.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class CountDownLatchInjectionImpl extends Injection implements CountDownLatchInjection {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountDownLatchInjectionImpl.class);
+
+  private ExtendedLatch latch = null;
+
+  @JsonCreator // ensures instances are created only through JSON
+  private CountDownLatchInjectionImpl(@JsonProperty("address") final String address,
+                                      @JsonProperty("port") final int port,
+                                      @JsonProperty("siteClass") final String siteClass,
+                                      @JsonProperty("desc") final String desc) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, 0, 1);
+  }
+
+  @Override
+  protected boolean injectNow() {
+    return true;
+  }
+
+  @Override
+  public void initialize(final int count) {
+    Preconditions.checkArgument(latch == null, "Latch can be initialized only once at %s in %s.", desc,
+      siteClass.getSimpleName());
+    Preconditions.checkArgument(count > 0, "Count has to be a positive integer at %s in %s.", desc,
+      siteClass.getSimpleName());
+    latch = new ExtendedLatch(count);
+  }
+
+  @Override
+  public void await() throws InterruptedException {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    try {
+      latch.await();
+    } catch (final InterruptedException e) {
+      logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
+      throw e;
+    }
+  }
+
+  @Override
+  public void awaitUninterruptibly() {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    latch.awaitUninterruptibly();
+  }
+
+  @Override
+  public void countDown() {
+    Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+    Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
+    latch.countDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 1171bf8..639802f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -50,13 +50,15 @@ public final class ExecutionControls {
     controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
   }
 
-  // Jackson MixIn for all types of injections
+  // Jackson MixIn: an annotated class that is used only by Jackson's ObjectMapper to allow a list of injections to
+  // hold various types of injections
   @JsonTypeInfo(
     use = JsonTypeInfo.Id.NAME,
     include = JsonTypeInfo.As.PROPERTY,
     property = "type")
   @JsonSubTypes({
     @Type(value = ExceptionInjection.class, name = "exception"),
+    @Type(value = CountDownLatchInjectionImpl.class, name = "latch"),
     @Type(value = PauseInjection.class, name = "pause")})
   public static abstract class InjectionMixIn {
   }
@@ -99,7 +101,7 @@ public final class ExecutionControls {
       final String jsonString = v.string_val;
       try {
         controlsOptionMapper.readValue(jsonString, Controls.class);
-      } catch (IOException e) {
+      } catch (final IOException e) {
         throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
       }
     }
@@ -137,7 +139,7 @@ public final class ExecutionControls {
     final Controls controls;
     try {
       controls = controlsOptionMapper.readValue(opString, Controls.class);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       // This never happens. opString must have been validated.
       logger.warn("Could not parse injections. Injections must have been validated before this point.");
       throw new DrillRuntimeException("Could not parse injections.", e);
@@ -153,7 +155,7 @@ public final class ExecutionControls {
   }
 
   /**
-   * Look for an exception injection matching the given injector, site description and endpoint.
+   * Look for an exception injection matching the given injector, site descriptor, and endpoint.
    *
    * @param injector the injector, which indicates a class
    * @param desc     the injection site description
@@ -165,7 +167,7 @@ public final class ExecutionControls {
   }
 
   /**
-   * Look for an pause injection matching the given injector, site description and endpoint.
+   * Look for an pause injection matching the given injector, site descriptor, and endpoint.
    *
    * @param injector the injector, which indicates a class
    * @param desc     the injection site description
@@ -176,6 +178,20 @@ public final class ExecutionControls {
     return injection != null ? (PauseInjection) injection : null;
   }
 
+  /**
+   * Look for a count down latch injection matching the given injector, site descriptor, and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the count down latch injection, if there is one for the injector, site and endpoint;
+   * otherwise, a latch that does nothing
+   */
+  public CountDownLatchInjection lookupCountDownLatchInjection(final ExecutionControlsInjector injector,
+                                                               final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (CountDownLatchInjection) injection : NoOpControlsInjector.LATCH;
+  }
+
   private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
     if (controls.isEmpty()) {
       return null;
@@ -190,4 +206,15 @@ public final class ExecutionControls {
     // return only if injection was meant for this drillbit
     return injection.isValidForBit(endpoint) ? injection : null;
   }
+
+  /**
+   * This method resumes all pauses within the current context (QueryContext or FragmentContext).
+   */
+  public void unpauseAll() {
+    for (final Injection injection : controls.values()) {
+      if (injection instanceof PauseInjection) {
+        ((PauseInjection) injection).unpause();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 4b1cd0c..05f8433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -75,12 +75,11 @@ public class ExecutionControlsInjector {
    * @param desc              the site description
    *                          throws the exception specified by the injection, if it is time
    */
-  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+  public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
     final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
     if (exceptionInjection != null) {
       exceptionInjection.throwUnchecked();
     }
-    return this;
   }
 
   /**
@@ -95,13 +94,12 @@ public class ExecutionControlsInjector {
    * @param exceptionClass    the expected class of the exception (or a super class of it)
    * @throws T the exception specified by the injection, if it is time
    */
-  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+  public <T extends Throwable> void injectChecked(
     final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
     final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
     if (exceptionInjection != null) {
       exceptionInjection.throwChecked(exceptionClass);
     }
-    return this;
   }
 
   /**
@@ -114,7 +112,7 @@ public class ExecutionControlsInjector {
    * @param desc              the site description
    * @param logger            logger of the class containing the injection site
    */
-  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+  public void injectPause(final ExecutionControls executionControls, final String desc,
                                                final Logger logger) {
     final PauseInjection pauseInjection =
       executionControls.lookupPauseInjection(this, desc);
@@ -124,6 +122,9 @@ public class ExecutionControlsInjector {
       pauseInjection.pause();
       logger.debug("Resuming at {}", desc);
     }
-    return this;
+  }
+
+  public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+    return executionControls.lookupCountDownLatchInjection(this, desc);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index 96fed3a..08ade51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -28,8 +28,8 @@ public abstract class Injection {
 
   protected final String address;  // the address of the drillbit on which to inject
   protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
-  private final Class<?> siteClass; // the class where the injection should happen
-  private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+  protected final Class<?> siteClass; // the class where the injection should happen
+  protected final String desc; // description of the injection site; useful for multiple exception injections in a single class
   private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
   private final AtomicInteger nFire;  // the number of times to do the injection, after any skips; starts > 0
 
@@ -64,7 +64,7 @@ public abstract class Injection {
    *
    * @return if the injection should be injected now
    */
-  protected final boolean injectNow() {
+  protected boolean injectNow() {
     return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 80d9790..33ab783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.testing;
 import org.slf4j.Logger;
 
 /**
- * An injector that does not inject any controls.
+ * An injector that does not inject any controls, useful when not testing (i.e. assertions are not enabled).
  */
 public final class NoOpControlsInjector extends ExecutionControlsInjector {
 
@@ -29,20 +29,42 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
   }
 
   @Override
-  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
-    return this;
+  public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
   }
 
   @Override
-  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+  public <T extends Throwable> void injectChecked(
     final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
-    return this;
   }
 
   @Override
-  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+  public void injectPause(final ExecutionControls executionControls, final String desc,
                                                final Logger logger) {
-    return this;
   }
 
+  /**
+   * When assertions are not enabled, this count down latch that does nothing is injected.
+   */
+  public static final CountDownLatchInjection LATCH = new CountDownLatchInjection() {
+    @Override
+    public void initialize(int count) {
+    }
+
+    @Override
+    public void await() {
+    }
+
+    @Override
+    public void awaitUninterruptibly() {
+    }
+
+    @Override
+    public void countDown() {
+    }
+  };
+
+  @Override
+  public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+    return LATCH;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index e5f9c9c..ff0340b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -21,43 +21,40 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 
 /**
- * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
- * injected pauses; these pauses are specified via
+ * Injection for a single pause. Pause indefinitely until signalled. This class is used internally for tracking
+ * injected pauses. Note that pauses can be fired only once; nFire field is ignored. These pauses are specified via
  * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
  *
- * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ * After the pauses are set, the user sends another signal to unpause all the pauses. This triggers the Foreman to
+ * 1) unpause all pauses in QueryContext, and
+ * 2) send an unpause signal to all fragments, each of which unpauses all pauses in FragmentContext.
  */
 @JsonAutoDetect(fieldVisibility = Visibility.ANY)
 public class PauseInjection extends Injection {
 
-  private final long millis;
+  private final ExtendedLatch latch = new ExtendedLatch(1);
 
   @JsonCreator // ensures instances are created only through JSON
   private PauseInjection(@JsonProperty("address") final String address,
                          @JsonProperty("port") final int port,
                          @JsonProperty("siteClass") final String siteClass,
                          @JsonProperty("desc") final String desc,
-                         @JsonProperty("nSkip") final int nSkip,
-                         @JsonProperty("nFire") final int nFire,
-                         @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
-    super(address, port, siteClass, desc, nSkip, nFire);
-    if (millis <= 0) {
-      throw new InjectionConfigurationException("Pause millis is non-positive.");
-    }
-    this.millis = millis;
+                         @JsonProperty("nSkip") final int nSkip) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, 1);
   }
 
   public void pause() {
-    if (! injectNow()) {
+    if (!injectNow()) {
       return;
     }
-    try {
-      Thread.sleep(millis);
-    } catch (InterruptedException e) {
-      throw new DrillRuntimeException("Well, I should be sleeping.");
-    }
+    latch.awaitUninterruptibly();
+  }
+
+  public void unpause() {
+    latch.countDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 3e4f3d1..f2352e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.QueryManager;
@@ -116,7 +115,7 @@ public class WorkManager implements AutoCloseable {
 
 
     // TODO references to this escape here (via WorkerBee) before construction is done
-    controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
+    controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
     userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
     statusThread = new StatusThread();
     dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
deleted file mode 100644
index b6c6852..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-import static org.apache.drill.exec.rpc.RpcBus.get;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConstants;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.UserRpcException;
-import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-
-public class ControlHandlerImpl implements ControlMessageHandler {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-  private final WorkerBee bee;
-
-  public ControlHandlerImpl(final WorkerBee bee) {
-    this.bee = bee;
-  }
-
-  @Override
-  public Response handle(final ControlConnection connection, final int rpcType,
-      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
-    if (RpcConstants.EXTRA_DEBUGGING) {
-      logger.debug("Received bit com message of type {}", rpcType);
-    }
-
-    switch (rpcType) {
-
-    case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
-      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
-      cancelFragment(handle);
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
-      final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
-      receivingFragmentFinished(finishedReceiver);
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
-      // TODO: Support a type of message that has no response.
-      return DataRpcConfig.OK;
-
-    case RpcType.REQ_QUERY_CANCEL_VALUE: {
-      final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman != null) {
-        foreman.cancel();
-        return DataRpcConfig.OK;
-      } else {
-        return DataRpcConfig.FAIL;
-      }
-    }
-
-    case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
-      final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
-      for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewRemoteFragment(fragments.getFragment(i));
-      }
-      return DataRpcConfig.OK;
-    }
-
-    case RpcType.REQ_QUERY_STATUS_VALUE: {
-      final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman == null) {
-        throw new RpcException("Query not running on node.");
-      }
-      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
-      return new Response(RpcType.RESP_QUERY_STATUS, profile);
-    }
-
-    default:
-      throw new RpcException("Not yet supported.");
-    }
-  }
-
-  @Override
-  public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
-    logger.debug("Received remote fragment start instruction", fragment);
-
-    final DrillbitContext drillbitContext = bee.getContext();
-    try {
-      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
-      if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
-            drillbitContext.getFunctionImplementationRegistry());
-        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
-        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
-        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
-            fragment.getFragmentJson());
-        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
-        bee.addFragmentRunner(fr);
-      } else {
-        // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
-        drillbitContext.getWorkBus().addFragmentManager(manager);
-      }
-
-    } catch (final Exception e) {
-        throw new UserRpcException(drillbitContext.getEndpoint(),
-            "Failure while trying to start remote fragment", e);
-    } catch (final OutOfMemoryError t) {
-      if (t.getMessage().startsWith("Direct buffer")) {
-        throw new UserRpcException(drillbitContext.getEndpoint(),
-            "Out of direct memory while trying to start remote fragment", t);
-      } else {
-        throw t;
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
-   */
-  @Override
-  public Ack cancelFragment(final FragmentHandle handle) {
-    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
-    if (manager != null) {
-      // try remote fragment cancel.
-      manager.cancel();
-    } else {
-      // then try local cancel.
-      final FragmentExecutor runner = bee.getFragmentRunner(handle);
-      if (runner != null) {
-        runner.cancel();
-      }
-    }
-
-    return Acks.OK;
-  }
-
-  private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
-
-    final FragmentManager manager =
-        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
-
-    FragmentExecutor executor;
-    if (manager != null) {
-      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
-    } else {
-      // then try local cancel.
-      executor = bee.getFragmentRunner(finishedReceiver.getSender());
-      if (executor != null) {
-        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
-      } else {
-        logger.warn(
-            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
-            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
-            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
-      }
-    }
-
-    return Acks.OK;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index c5d78cc..d12e6d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,24 +17,207 @@
  */
 package org.apache.drill.exec.work.batch;
 
+import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.RpcType;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
+import org.apache.drill.exec.rpc.control.ControlRpcConfig;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
-public interface ControlMessageHandler {
+public class ControlMessageHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
+  private final WorkerBee bee;
 
-  public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
-      throws RpcException;
+  public ControlMessageHandler(final WorkerBee bee) {
+    this.bee = bee;
+  }
 
-  public abstract void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException;
+  public Response handle(final ControlConnection connection, final int rpcType,
+      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+    if (RpcConstants.EXTRA_DEBUGGING) {
+      logger.debug("Received bit com message of type {}", rpcType);
+    }
 
-  public abstract Ack cancelFragment(FragmentHandle handle);
+    switch (rpcType) {
 
+    case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+      cancelFragment(handle);
+      return ControlRpcConfig.OK;
+    }
 
-}
\ No newline at end of file
+    case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+      final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+      receivingFragmentFinished(finishedReceiver);
+      return ControlRpcConfig.OK;
+    }
+
+    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+      bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
+      // TODO: Support a type of message that has no response.
+      return ControlRpcConfig.OK;
+
+    case RpcType.REQ_QUERY_CANCEL_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
+      if (foreman != null) {
+        foreman.cancel();
+        return ControlRpcConfig.OK;
+      } else {
+        return ControlRpcConfig.FAIL;
+      }
+    }
+
+    case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+      final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+      for(int i = 0; i < fragments.getFragmentCount(); i++) {
+        startNewRemoteFragment(fragments.getFragment(i));
+      }
+      return ControlRpcConfig.OK;
+    }
+
+    case RpcType.REQ_QUERY_STATUS_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
+      if (foreman == null) {
+        throw new RpcException("Query not running on node.");
+      }
+      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+      return new Response(RpcType.RESP_QUERY_STATUS, profile);
+    }
+
+    case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+      resumeFragment(handle);
+      return ControlRpcConfig.OK;
+    }
+
+    default:
+      throw new RpcException("Not yet supported.");
+    }
+  }
+
+  private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
+    logger.debug("Received remote fragment start instruction", fragment);
+
+    final DrillbitContext drillbitContext = bee.getContext();
+    try {
+      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+      if (fragment.getLeafFragment()) {
+        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
+            drillbitContext.getFunctionImplementationRegistry());
+        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+            fragment.getFragmentJson());
+        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+        bee.addFragmentRunner(fr);
+      } else {
+        // isIntermediate, store for incoming data.
+        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+        drillbitContext.getWorkBus().addFragmentManager(manager);
+      }
+
+    } catch (final Exception e) {
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Failure while trying to start remote fragment", e);
+    } catch (final OutOfMemoryError t) {
+      if (t.getMessage().startsWith("Direct buffer")) {
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Out of direct memory while trying to start remote fragment", t);
+      } else {
+        throw t;
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+   */
+  private Ack cancelFragment(final FragmentHandle handle) {
+    // cancel a pending fragment
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+    if (manager != null) {
+      manager.cancel();
+      return Acks.OK;
+    }
+
+    // cancel a running fragment
+    final FragmentExecutor runner = bee.getFragmentRunner(handle);
+    if (runner != null) {
+      runner.cancel();
+      return Acks.OK;
+    }
+
+    // fragment completed or does not exist
+    logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    return Acks.OK;
+  }
+
+  private Ack resumeFragment(final FragmentHandle handle) {
+    // resume a pending fragment
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+    if (manager != null) {
+      manager.unpause();
+      return Acks.OK;
+    }
+
+    // resume a paused fragment
+    final FragmentExecutor runner = bee.getFragmentRunner(handle);
+    if (runner != null) {
+      runner.unpause();
+      return Acks.OK;
+    }
+
+    // fragment completed or does not exist
+    logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    return Acks.OK;
+  }
+
+  private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
+    final FragmentManager manager =
+        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+
+    FragmentExecutor executor;
+    if (manager != null) {
+      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
+    } else {
+      executor = bee.getFragmentRunner(finishedReceiver.getSender());
+      if (executor != null) {
+        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      } else {
+        logger.warn(
+            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+      }
+    }
+
+    return Acks.OK;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4d403b8..0122ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -119,6 +119,7 @@ public class Foreman implements Runnable {
   private final DrillbitContext drillbitContext;
   private final UserClientConnection initiatingClient; // used to send responses
   private volatile QueryState state;
+  private boolean resume = false;
 
   private volatile DistributedLease lease; // used to limit the number of concurrent queries
 
@@ -196,6 +197,18 @@ public class Foreman implements Runnable {
   }
 
   /**
+   * Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
+   * This method can be called multiple times.
+   */
+  public void resume() {
+    resume = true;
+    // resume all pauses through query context
+    queryContext.getExecutionControls().unpauseAll();
+    // resume all pauses through all fragment contexts
+    queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+  }
+
+  /**
    * Called by execution pool to do query setup, and kick off remote execution.
    *
    * <p>Note that completion of this function is not the end of the Foreman's role
@@ -268,9 +281,20 @@ public class Foreman implements Runnable {
        * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
        * make sure that we can't make things any worse as those events are delivered, but allow
        * any necessary remaining cleanup to proceed.
+       *
+       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+       * to accept events.
        */
       acceptExternalEvents.countDown();
 
+      // If we received the resume signal before fragments are setup, the first call does not actually resume the
+      // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+      if(resume) {
+        resume();
+      }
+      injector.injectPause(queryContext.getExecutionControls(), "foreman-ready", logger);
+
       // restore the thread's original name
       currentThread.setName(originalName);
     }
@@ -375,7 +399,6 @@ public class Foreman implements Runnable {
     drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
     logger.debug("Submitting fragments to run.");
-    injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
 
     // set up the root fragment first so we'll have incoming buffers available.
     setupRootFragment(rootPlanFragment, work.getRootOperator());

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index c4646bd..090a377 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -189,7 +189,8 @@ public class QueryManager {
           final DrillbitEndpoint endpoint = data.getEndpoint();
           final FragmentHandle handle = data.getHandle();
           // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
-          controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+          controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+            SignalListener.Signal.CANCEL), handle);
         }
         break;
 
@@ -203,25 +204,52 @@ public class QueryManager {
     }
   }
 
+  /**
+   * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
+   * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
+   * control tunnel.
+   */
+  void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+    if (rootRunner != null) {
+      rootRunner.unpause();
+    }
+    final Controller controller = drillbitContext.getController();
+    for(final FragmentData data : fragmentDataSet) {
+      final DrillbitEndpoint endpoint = data.getEndpoint();
+      final FragmentHandle handle = data.getHandle();
+      controller.getTunnel(endpoint).resumeFragment(new SignalListener(endpoint, handle,
+        SignalListener.Signal.UNPAUSE), handle);
+    }
+  }
+
   /*
    * This assumes that the FragmentStatusListener implementation takes action when it hears
-   * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+   * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
    * but log messages.
    */
-  private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
-    public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+  private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
+    /**
+     * An enum of possible signals that {@link SignalListener} listens to.
+     */
+    public static enum Signal { CANCEL, UNPAUSE }
+
+    private final Signal signal;
+
+    public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
       super(endpoint, handle);
+      this.signal = signal;
     }
 
     @Override
     public void failed(final RpcException ex) {
-      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+      logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
     }
 
     @Override
-    public void success(final Ack value, final ByteBuf buf) {
-      if (!value.getOk()) {
-        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+    public void success(final Ack ack, final ByteBuf buf) {
+      if (!ack.getOk()) {
+        logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
+          ack);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7baafc4..24e2556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -108,7 +108,7 @@ public class FragmentExecutor implements Runnable {
 
   /**
    * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
-   * NOTE that this can be called from threads *other* than the one running this runnable(),
+   * NOTE that this will be called from threads *other* than the one running this runnable(),
    * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
@@ -140,11 +140,18 @@ public class FragmentExecutor implements Runnable {
   }
 
   /**
+   * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
+   * the one running this runnable(). Also, this method can be called multiple times.
+   */
+  public synchronized void unpause() {
+    fragmentContext.getExecutionControls().unpauseAll();
+  }
+
+  /**
    * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
    * called in the case that a limit query is executed.
    *
-   * @param handle
-   *          The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+   * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
    */
   public void receivingFragmentFinished(final FragmentHandle handle) {
     acceptExternalEvents.awaitUninterruptibly();
@@ -277,6 +284,8 @@ public class FragmentExecutor implements Runnable {
 
     // first close the operators and release all memory.
     try {
+      // Say executor was cancelled before setup. Now when executor actually runs, root is not initialized, but this
+      // method is called in finally. So root can be null.
       if (root != null) {
         root.close();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 0ba91b4..ad880da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -49,6 +49,12 @@ public interface FragmentManager {
 
   public abstract void cancel();
 
+  /**
+   * If the executor is paused (for testing), this method should unpause the executor. This method should handle
+   * multiple calls.
+   */
+  public abstract void unpause();
+
   public boolean isWaiting();
 
   public abstract FragmentHandle getHandle();

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index f526fbe..ca5d5b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -112,6 +112,11 @@ public class NonRootFragmentManager implements FragmentManager {
   }
 
   @Override
+  public void unpause() {
+    runner.unpause();
+  }
+
+  @Override
   public FragmentHandle getHandle() {
     return fragment.getHandle();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b1c3fe0..67ef9b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -29,8 +29,8 @@ import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 // TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+public class RootFragmentManager implements FragmentManager {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 
   private final IncomingBuffers buffers;
   private final FragmentExecutor runner;
@@ -70,6 +70,11 @@ public class RootFragmentManager implements FragmentManager{
   }
 
   @Override
+  public void unpause() {
+    runner.unpause();
+  }
+
+  @Override
   public boolean isWaiting() {
     return !buffers.isDone() && !cancel;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 8854ef3..e8deb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -68,6 +68,14 @@ public class UserWorker{
     return Acks.OK;
   }
 
+  public Ack resumeQuery(final QueryId queryId) {
+    final Foreman foreman = bee.getForemanForQueryId(queryId);
+    if (foreman != null) {
+      foreman.resume();
+    }
+    return Acks.OK;
+  }
+
   public OptionManager getSystemOptions() {
     return bee.getContext().getOptionManager();
   }


[3/3] drill git commit: DRILL-2755: Use and handle InterruptedException during query processing.

Posted by ve...@apache.org.
DRILL-2755: Use and handle InterruptedException during query processing.

- Interrupt FragmentExecutor thread as part of FragmentExecutor.cancel()
- Handle InterruptedException in ExternalSortBatch.newSV2(). If the fragment status says
  should not continue, then throw the InterruptedException to caller which returns IterOutcome.STOP
- Add comments reg not handling of InterruptedException in SendingAccountor.waitForSendComplete()
- Handle InterruptedException in OrderedPartitionRecordBatch.getPartitionVectors()
  If interrupted in Thread.sleep calls and fragment status says should not run, then
  return IterOutcome.STOP downstream.
- Interrupt partitioner threads if PartitionerRecordBatch is interrupted while waiting for
  partitioner threads to complete.
- Preserve interrupt status if not handled
- Handle null RecordBatches returned by RawBatchBuffer.getNext() in MergingRecordBatch.buildSchema()
- Change timeout in Foreman to be proportional to the number of intermediate fragments sent instead
  of hard coded limit of 90s.
- Change TimedRunnable to enforce a timeout of 15s per runnable.
  Total timeout is (5s * numOfRunnableTasks) / parallelism.
- Add unit tests
  * Testing cancelling a query interrupts the query fragments which are currently blocked
  * Testing interrupting the partitioner sender which in turn interrupts its helper threads
  * Testing TimedRunanble enforeces timeout for the whole task list.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3a294abc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3a294abc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3a294abc

Branch: refs/heads/master
Commit: 3a294abcc51148e0e79096af5e6d3c45b7c19a9d
Parents: f8e5e61
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 6 09:34:25 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun May 10 12:24:45 2015 -0700

----------------------------------------------------------------------
 .../drill/common/concurrent/ExtendedLatch.java  |  21 +---
 .../apache/drill/exec/ops/SendingAccountor.java |  13 +++
 .../impl/mergereceiver/MergingRecordBatch.java  |  19 +++
 .../OrderedPartitionRecordBatch.java            |  31 ++++-
 .../partitionsender/PartitionerDecorator.java   |  70 +++++++++--
 .../partitionsender/PartitionerTemplate.java    |   3 +-
 .../UnorderedReceiverBatch.java                 |  22 +++-
 .../physical/impl/xsort/ExternalSortBatch.java  |  10 +-
 .../exec/record/RawFragmentBatchProvider.java   |   2 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |   9 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |   7 +-
 .../drill/exec/rpc/ReconnectingConnection.java  |   7 +-
 .../apache/drill/exec/rpc/RemoteConnection.java |  18 ++-
 .../apache/drill/exec/rpc/data/DataTunnel.java  |  14 ++-
 .../org/apache/drill/exec/server/Drillbit.java  |   4 +
 .../apache/drill/exec/store/TimedRunnable.java  |  52 +++++++--
 .../exec/testing/CountDownLatchInjection.java   |   5 +
 .../testing/CountDownLatchInjectionImpl.java    |   5 +
 .../exec/testing/ExecutionControlsInjector.java |  28 +++++
 .../exec/testing/NoOpControlsInjector.java      |   4 +
 .../drill/exec/testing/PauseInjection.java      |   7 ++
 .../org/apache/drill/exec/work/WorkManager.java |  24 ++--
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   9 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |   8 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  14 ++-
 .../exec/work/fragment/FragmentExecutor.java    |  14 +++
 .../exec/server/TestDrillbitResilience.java     | 117 ++++++++++++++++++-
 .../drill/exec/store/TestTimedRunnable.java     | 103 ++++++++++++++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     |   4 +
 29 files changed, 546 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index a75ac32..3e14f8a 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -35,16 +35,6 @@ public class ExtendedLatch extends CountDownLatch {
   }
 
   /**
-   * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
-   * state variable or similar.
-   *
-   * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
-   */
-  protected boolean ignoreInterruptions() {
-    return true;
-  }
-
-  /**
    * Await without interruption for a given time.
    * @param waitMillis
    *          Time in milliseconds to wait
@@ -68,8 +58,7 @@ public class ExtendedLatch extends CountDownLatch {
   }
 
   /**
-   * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
-   * output of ignoreInterruptions();
+   * Await without interruption. In the case of interruption, log a warning and continue to wait.
    */
   public void awaitUninterruptibly() {
     while (true) {
@@ -77,12 +66,8 @@ public class ExtendedLatch extends CountDownLatch {
         await();
         return;
       } catch (final InterruptedException e) {
-        if (ignoreInterruptions()) {
-          // if we're still not ready, the while loop will cause us to wait again
-          logger.warn("Interrupted while waiting for event latch.", e);
-        } else {
-          return;
-        }
+        // if we're still not ready, the while loop will cause us to wait again
+        logger.warn("Interrupted while waiting for event latch.", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index 0cb5fbf..5db2cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -42,13 +42,26 @@ class SendingAccountor {
 
   public synchronized void waitForSendComplete() {
       int waitForBatches = batchesSent.get();
+      boolean isInterrupted = false;
       while(waitForBatches != 0) {
         try {
           wait.acquire(waitForBatches);
           waitForBatches = batchesSent.addAndGet(-1 * waitForBatches);
         } catch (InterruptedException e) {
+          // We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
+          // to send. This should be safe because: network connections should get disconnected and fail a send if a
+          // node goes down, otherwise, the receiving side connection should always consume from the rpc layer
+          // (blocking is cooperative and will get triggered before this)
           logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);
+
+          isInterrupted = true;
         }
       }
+
+      if (isInterrupted) {
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
+      }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f19f371..5d990f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -70,6 +70,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
@@ -88,6 +89,7 @@ import com.sun.codemodel.JExpr;
  */
 public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(MergingRecordBatch.class);
 
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
@@ -141,6 +143,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     stats.startWait();
     final RawFragmentBatchProvider provider = fragProviders[providerIndex];
     try {
+      injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
       final RawFragmentBatch b = provider.getNext();
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
@@ -148,6 +151,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
       }
       return b;
+    } catch(final InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
     } finally {
       stats.stopWait();
     }
@@ -359,6 +368,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
             nextBatch = getNext(node.batchId);
           }
+
           assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
               : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
           if (nextBatch == null && !context.shouldContinue()) {
@@ -461,6 +471,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           return;
         }
         final RawFragmentBatch batch = getNext(i);
+        if (batch == null) {
+          if (!context.shouldContinue()) {
+            state = BatchState.STOP;
+          } else {
+            state = BatchState.DONE;
+          }
+
+          break;
+        }
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
           continue;

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 63b7eba..ca6d83c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -246,6 +246,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   }
 
   /**
+   * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
+   * @param timeout Timeout in milliseconds.
+   * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+   */
+  private boolean waitUntilTimeOut(final long timeout) {
+    while(true) {
+      try {
+        Thread.sleep(timeout);
+        return true;
+      } catch (final InterruptedException e) {
+        if (!context.shouldContinue()) {
+          return false;
+        }
+      }
+    }
+  }
+
+  /**
    * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
    * that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
    * distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
@@ -255,10 +273,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * @return True is successful. False if failed.
    */
   private boolean getPartitionVectors() {
-
-
     try {
-
       if (!saveSamples()) {
         return false;
       }
@@ -279,14 +294,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         // TODO: this should be polling.
 
         if (val < fragmentsBeforeProceed) {
-          Thread.sleep(10);
+          if (!waitUntilTimeOut(10)) {
+            return false;
+          }
         }
         for (int i = 0; i < 100 && finalTable == null; i++) {
           finalTable = tableMap.get(finalTableKey);
           if (finalTable != null) {
             break;
           }
-          Thread.sleep(10);
+          if (!waitUntilTimeOut(10)) {
+            return false;
+          }
         }
         if (finalTable == null) {
           buildTable();
@@ -302,7 +321,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         partitionVectors.add(w.getValueVector());
       }
 
-    } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
+    } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
       kill(false);
       context.fail(ex);
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c3261dc..c355070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -28,6 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.testing.CountDownLatchInjection;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 /**
  * Decorator class to hide multiple Partitioner existence from the caller
@@ -38,19 +41,22 @@ import com.google.common.collect.Lists;
  * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
  */
 public class PartitionerDecorator {
-
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+  private static final ExecutionControlsInjector injector =
+      ExecutionControlsInjector.getInjector(PartitionerDecorator.class);
 
   private List<Partitioner> partitioners;
   private final OperatorStats stats;
   private final String tName;
   private final String childThreadPrefix;
   private final ExecutorService executor;
+  private final FragmentContext context;
 
 
   public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
     this.partitioners = partitioners;
     this.stats = stats;
+    this.context = context;
     this.executor = context.getDrillbitContext().getExecutor();
     this.tName = Thread.currentThread().getName();
     this.childThreadPrefix = "Partitioner-" + tName + "-";
@@ -145,17 +151,42 @@ public class PartitionerDecorator {
     stats.startWait();
     final CountDownLatch latch = new CountDownLatch(partitioners.size());
     final List<CustomRunnable> runnables = Lists.newArrayList();
+    final List<Future> taskFutures = Lists.newArrayList();
+    CountDownLatchInjection testCountDownLatch = null;
     try {
-      int i = 0;
-      for (final Partitioner part : partitioners ) {
-        runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part));
-        executor.submit(runnables.get(i++));
+      // To simulate interruption of main fragment thread and interrupting the partition threads, create a
+      // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
+      // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
+      testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
+      testCountDownLatch.initialize(1);
+      for (final Partitioner part : partitioners) {
+        final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
+        runnables.add(runnable);
+        taskFutures.add(executor.submit(runnable));
       }
-      try {
-        latch.await();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+
+      while (true) {
+        try {
+          // Wait for main fragment interruption.
+          injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
+
+          // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
+          injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();
+
+          latch.await();
+          break;
+        } catch (final InterruptedException e) {
+          // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
+          if (!context.shouldContinue()) {
+            for(Future f : taskFutures) {
+              f.cancel(true);
+            }
+
+            break;
+          }
+        }
       }
+
       IOException excep = null;
       for (final CustomRunnable runnable : runnables ) {
         IOException myException = runnable.getException();
@@ -180,8 +211,12 @@ public class PartitionerDecorator {
       // scale down main stats wait time based on calculated processing time
       // since we did not wait for whole duration of above execution
       stats.adjustWaitNanos(-maxProcessTime);
-    }
 
+      // Done with the latch, close it.
+      if (testCountDownLatch != null) {
+        testCountDownLatch.close();
+      }
+    }
   }
 
   /**
@@ -190,7 +225,7 @@ public class PartitionerDecorator {
    * protected is for testing purposes
    */
   protected interface GeneralExecuteIface {
-    public void execute(Partitioner partitioner) throws IOException;
+    void execute(Partitioner partitioner) throws IOException;
   }
 
   /**
@@ -242,17 +277,28 @@ public class PartitionerDecorator {
     private final CountDownLatch latch;
     private final GeneralExecuteIface iface;
     private final Partitioner part;
+    private CountDownLatchInjection testCountDownLatch;
+
     private volatile IOException exp;
 
-    public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) {
+    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
+        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
       this.parentThreadName = parentThreadName;
       this.latch = latch;
       this.iface = iface;
       this.part = part;
+      this.testCountDownLatch = testCountDownLatch;
     }
 
     @Override
     public void run() {
+      // Test only - Pause until interrupted by fragment thread
+      try {
+        testCountDownLatch.await();
+      } catch (final InterruptedException e) {
+        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
+      }
+
       final Thread currThread = Thread.currentThread();
       final String currThreadName = currThread.getName();
       final OperatorStats localStats = part.getStats();

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index cbea267..aeac01d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -285,7 +285,8 @@ public abstract class PartitionerTemplate implements Partitioner {
       //      to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
       //      sender has acknowledged the terminate request. After sending the last batch, all further batches are
       //      dropped.
-      final boolean isLastBatch = isLast || terminated;
+      //   3. Partitioner thread is interrupted due to cancellation of fragment.
+      final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();
 
       // if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
       if (!isLastBatch && recordCount == 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 66a2092..e40fe54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -49,9 +49,12 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class UnorderedReceiverBatch implements CloseableRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+  private final static ExecutionControlsInjector injector =
+      ExecutionControlsInjector.getInjector(UnorderedReceiverBatch.class);
 
   private final RecordBatchLoader batchLoader;
   private final RawFragmentBatchProvider fragProvider;
@@ -133,6 +136,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
+  private RawFragmentBatch getNextBatch() throws IOException {
+    try {
+      injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
+      return fragProvider.getNext();
+    } catch(final InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
+    }
+  }
+
   @Override
   public IterOutcome next() {
     stats.startProcessing();
@@ -140,11 +156,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       RawFragmentBatch batch;
       try {
         stats.startWait();
-        batch = fragProvider.getNext();
+        batch = getNextBatch();
 
         // skip over empty batches. we do this since these are basically control messages.
         while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
-          batch = fragProvider.getNext();
+          batch = getNextBatch();
         }
       } finally {
         stats.stopWait();

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index aab3391..3159811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -277,6 +277,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           } else {
             try {
               sv2 = newSV2();
+            } catch(InterruptedException e) {
+              return IterOutcome.STOP;
             } catch (OutOfMemoryException e) {
               throw new OutOfMemoryRuntimeException(e);
             }
@@ -496,7 +498,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return size;
   }
 
-  private SelectionVector2 newSV2() throws OutOfMemoryException {
+  private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
     SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
     if (!sv2.allocateNew(incoming.getRecordCount())) {
       try {
@@ -509,8 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       while (true) {
         try {
           Thread.sleep(waitTime * 1000);
-        } catch (InterruptedException e) {
-          throw new OutOfMemoryException(e);
+        } catch(final InterruptedException e) {
+          if (!context.shouldContinue()) {
+            throw e;
+          }
         }
         waitTime *= 2;
         if (sv2.allocateNew(incoming.getRecordCount())) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index d4dfe96..030785c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 
 public interface RawFragmentBatchProvider {
 
-  public RawFragmentBatch getNext() throws IOException;
+  public RawFragmentBatch getNext() throws IOException, InterruptedException;
   public void kill(FragmentContext context);
   public void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 1661f81..d551173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -263,9 +263,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     logger.debug("Closing client");
     try {
       connection.getChannel().close().get();
-    } catch (InterruptedException | ExecutionException e) {
-      logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
-      // TODO InterruptedException
+    } catch (final InterruptedException | ExecutionException e) {
+      logger.warn("Failure while shutting {}", this.getClass().getName(), e);
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a148436..6a7bc65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -204,9 +204,12 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   public void close() throws IOException {
     try {
       eventLoopGroup.shutdownGracefully().get();
-    } catch (InterruptedException | ExecutionException e) {
+    } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index 9948d3e..f0787a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -112,9 +112,12 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
           cmd.connectionSucceeded(connection);
 //          logger.debug("Finished connection succeeded activity.");
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         cmd.connectionFailed(FailureType.CONNECTION, e);
-        // TODO InterruptedException
+
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
       } catch (ExecutionException e) {
         throw new IllegalStateException();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0f095c0..2ee9263 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -70,9 +70,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
     try{
       writeManager.waitForWritable();
       return true;
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       listener.failed(new RpcException(e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
       return false;
     }
   }
@@ -131,10 +135,14 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
       if (channel.isActive()) {
         channel.close().get();
       }
-    } catch (InterruptedException | ExecutionException e) {
+      channel.close().get();
+    } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 11f5496..ed31bed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -48,9 +48,12 @@ public class DataTunnel {
     try{
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -59,9 +62,12 @@ public class DataTunnel {
     try{
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
     return b.getFuture();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index e7a9a3c..531253e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -271,6 +271,10 @@ public class Drillbit implements AutoCloseable {
       Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
     } catch (final InterruptedException e) {
       logger.warn("Interrupted while sleeping during coordination deregistration.");
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
 
     if (embeddedJetty != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 0fb778b..e52820b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.store;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 
 import com.google.common.base.Stopwatch;
@@ -36,6 +41,8 @@ import com.google.common.collect.Lists;
  */
 public abstract class TimedRunnable<V> implements Runnable {
 
+  private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+
   private volatile Exception e;
   private volatile long timeNanos;
   private volatile V value;
@@ -91,10 +98,13 @@ public abstract class TimedRunnable<V> implements Runnable {
   }
 
   /**
-   * Execute the list of runnables with the given parallelization.  At end, return values and report completion time stats to provided logger.
+   * Execute the list of runnables with the given parallelization.  At end, return values and report completion time
+   * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
+   * tasks will be cancelled and a {@link UserException} is thrown.
    * @param activity Name of activity for reporting in logger.
    * @param logger The logger to use to report results.
-   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be completed in-thread.
+   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be
+   *                  completed in-thread. Runnable must handle {@link InterruptedException}s.
    * @param parallelism  The number of threads that should be run to complete this task.
    * @return The list of outcome objects.
    * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
@@ -107,25 +117,43 @@ public abstract class TimedRunnable<V> implements Runnable {
       runnables.get(0).run();
     }else{
       parallelism = Math.min(parallelism,  runnables.size());
-      final CountDownLatch latch = new CountDownLatch(runnables.size());
+      final ExtendedLatch latch = new ExtendedLatch(runnables.size());
       final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
       try{
         for(TimedRunnable<V> runnable : runnables){
           threadPool.submit(new LatchedRunnable(latch, runnable));
         }
-      }finally{
-        threadPool.shutdown();
-      }
 
-      try{
-        latch.await();
-      }catch(InterruptedException e){
-        // TODO interrupted exception.
-        throw new RuntimeException(e);
+        final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
+        if (!latch.awaitUninterruptibly(timeout)) {
+          // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
+          // It is highly important that the task Runnables are handling interrupts correctly.
+          threadPool.shutdownNow();
+
+          try {
+            // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+            // any running threads. If the runnables are handling the interrupts properly they should be able to
+            // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+            // thread leaks.
+            threadPool.awaitTermination(5, TimeUnit.SECONDS);
+          } catch (final InterruptedException e) {
+            logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
+          }
+
+          final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
+              "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
+          logger.error(errMsg);
+          throw UserException.resourceError()
+              .message(errMsg)
+              .build();
+        }
+      } finally {
+        if (!threadPool.isShutdown()) {
+          threadPool.shutdown();
+        }
       }
     }
 
-
     List<V> values = Lists.newArrayList();
     long sum = 0;
     long max = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index de4a181..d26e2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -48,4 +48,9 @@ public interface CountDownLatchInjection {
    * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
    */
   void countDown();
+
+  /**
+   * Close the latch.
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index f4012c1..561d816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -82,4 +82,9 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
     Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
     latch.countDown();
   }
+
+  @Override
+  public void close() {
+    latch = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 05f8433..387d300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -124,6 +124,34 @@ public class ExecutionControlsInjector {
     }
   }
 
+  /**
+   * Insert a pause that can be interrupted using {@link Thread#interrupt()} at the given site point, if such an
+   * injection is specified (i.e. matches the site description).
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate a interruptible pause
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param logger            logger of the class containing the injection site
+   * @throws InterruptedException if interrupted using {@link Thread#interrupt()}
+   */
+  public void injectInterruptiblePause(final ExecutionControls executionControls, final String desc,
+      final Logger logger) throws InterruptedException {
+    final PauseInjection pauseInjection = executionControls.lookupPauseInjection(this, desc);
+
+    if (pauseInjection != null) {
+      logger.debug("Interruptible pausing at {}", desc);
+      try {
+        pauseInjection.interruptiblePause();
+      } catch (final InterruptedException e) {
+        logger.debug("Pause interrupted at {}", desc);
+        throw e;
+      }
+      logger.debug("Interruptible pause resuming at {}", desc);
+    }
+  }
+
   public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
     return executionControls.lookupCountDownLatchInjection(this, desc);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 33ab783..bb13d1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -61,6 +61,10 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
     @Override
     public void countDown() {
     }
+
+    @Override
+    public void close() {
+    }
   };
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index ff0340b..fc4d8ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -54,6 +54,13 @@ public class PauseInjection extends Injection {
     latch.awaitUninterruptibly();
   }
 
+  public void interruptiblePause() throws InterruptedException {
+    if (!injectNow()) {
+      return;
+    }
+    latch.await();
+  }
+
   public void unpause() {
     latch.countDown();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index f2352e6..1d3a0b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,13 +21,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.SelfCleaningRunnable;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -173,6 +173,10 @@ public class WorkManager implements AutoCloseable {
       }
     } catch (final InterruptedException e) {
       logger.warn("Executor interrupted while awaiting termination");
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -180,7 +184,7 @@ public class WorkManager implements AutoCloseable {
     return dContext;
   }
 
-  private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+  private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running
 
   /**
    * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
@@ -193,17 +197,11 @@ public class WorkManager implements AutoCloseable {
         return;
       }
 
-      exitLatch = new CountDownLatch(1);
+      exitLatch = new ExtendedLatch();
     }
 
-    while(true) {
-      try {
-        exitLatch.await(5, TimeUnit.SECONDS);
-      } catch(final InterruptedException e) {
-        // keep waiting
-      }
-      break;
-    }
+    // Wait for at most 5 seconds or until the latch is released.
+    exitLatch.awaitUninterruptibly(5000);
   }
 
   /**
@@ -328,6 +326,10 @@ public class WorkManager implements AutoCloseable {
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
         } catch(final InterruptedException e) {
+          // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+          // interruption and respond to it if it wants to.
+          Thread.currentThread().interrupt();
+
           // exit status thread on interrupt.
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 2a79e42..07a3505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -141,7 +141,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
   }
 
   @Override
-  public RawFragmentBatch getNext() throws IOException {
+  public RawFragmentBatch getNext() throws IOException, InterruptedException {
     if (outOfMemory && buffer.size() < 10) {
       outOfMemory = false;
       fragmentManager.setAutoRead(true);
@@ -160,9 +160,12 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
         }
         queueSize -= w.getBodySize();
         return batch;
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
+
         return null;
-        // TODO InterruptedException
       }
     }
     if (w == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index d23655c..4750666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
 
   private static enum BufferState {
     INIT,
@@ -149,7 +149,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public RawFragmentBatch getNext() {
+  public RawFragmentBatch getNext() throws IOException, InterruptedException {
 
     if (outOfMemory.get() && buffer.size() < 10) {
       logger.trace("Setting autoread true");
@@ -166,8 +166,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       try {
         b = buffer.take();
       } catch (final InterruptedException e) {
-        return null;
-        // TODO InterruptedException
+        logger.debug("Interrupted while waiting for incoming data.", e);
+        throw e;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0122ef8..bf62ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
   private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
-  private static final int RPC_WAIT_IN_SECONDS = 90;
+  private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
 
   private final QueryId queryId;
   private final RunQuery queryRequest;
@@ -967,7 +967,8 @@ public class Foreman implements Runnable {
      * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
      * know if any submissions did fail.
      */
-    final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
+    final int numIntFragments = intFragmentMap.keySet().size();
+    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
@@ -975,16 +976,17 @@ public class Foreman implements Runnable {
       sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
-    if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+    if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
       long numberRemaining = endpointLatch.getCount();
       throw UserException.connectionError()
           .message(
-              "Exceeded timeout while waiting send intermediate work fragments to remote nodes.  Sent %d and only heard response back from %d nodes.",
-              intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+              "Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+                  "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - numberRemaining)
           .build();
     }
 
-
     // if any of the intermediate fragment submissions failed, fail the query
     final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
     if (submissionExceptions.size() > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 24e2556..d96e6d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -62,6 +62,9 @@ public class FragmentExecutor implements Runnable {
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
   private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
 
+  // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
+  private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
+
   public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
                           final StatusReporter listener) {
     this.fragmentContext = context;
@@ -136,6 +139,14 @@ public class FragmentExecutor implements Runnable {
        * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
        */
       updateState(FragmentState.CANCELLATION_REQUESTED);
+
+      /*
+       * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+       */
+      final Thread myThread = myThreadRef.get();
+      if (myThread != null) {
+        myThread.interrupt();
+      }
     }
   }
 
@@ -168,6 +179,7 @@ public class FragmentExecutor implements Runnable {
   @Override
   public void run() {
     final Thread myThread = Thread.currentThread();
+    myThreadRef.set(myThread);
     final String originalThreadName = myThread.getName();
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
     final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
@@ -244,6 +256,8 @@ public class FragmentExecutor implements Runnable {
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       myThread.setName(originalThreadName);
+
+      myThreadRef.set(null);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 3e4dcb2..d72d498 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.server;
 
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -43,6 +47,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -437,6 +445,20 @@ public class TestDrillbitResilience {
     }
   }
 
+  private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+    private boolean cancelRequested = false;
+
+    @Override
+    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+      if (!cancelRequested) {
+        check(queryId != null, "Query id should not be null, since we have waited long enough.");
+        (new CancellingThread(queryId, ex, null)).start();
+        cancelRequested = true;
+      }
+      result.release();
+    }
+  };
+
   /**
    * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
    */
@@ -459,7 +481,9 @@ public class TestDrillbitResilience {
       } catch (final RpcException ex) {
         this.ex.value = ex;
       }
-      latch.countDown();
+      if (latch != null) {
+        latch.countDown();
+      }
     }
   }
 
@@ -507,13 +531,33 @@ public class TestDrillbitResilience {
    * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
    */
   private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    assertCancelled(controls, TEST_QUERY, listener);
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
+   */
+  private static void assertCancelled(final String controls, final String testQuery,
+      final WaitUntilCompleteListener listener) {
     setControls(controls);
 
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, testQuery, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertCompleteState(result, QueryState.CANCELED);
   }
 
+  private static void setSessionOption(final String option, final String value) {
+    try {
+      final List<QueryDataBatch> results = drillClient.runQuery(QueryType.SQL,
+          String.format("alter session set `%s` = %s", option, value));
+      for (final QueryDataBatch data : results) {
+        data.release();
+      }
+    } catch(RpcException e) {
+      fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString()));
+    }
+  }
+
   private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
     return "{\"injections\" : [{"
       + "\"type\" : \"pause\"," +
@@ -667,4 +711,73 @@ public class TestDrillbitResilience {
     final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
+
+  /**
+   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
+   */
+  @Test
+  public void testInterruptingBlockedMergingRecordBatch() {
+    final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
+    testInterruptingBlockedFragmentsWaitingForData(control);
+  }
+
+  /**
+   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
+   */
+  @Test
+  public void testInterruptingBlockedUnorderedReceiverBatch() {
+    final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
+    testInterruptingBlockedFragmentsWaitingForData(control);
+  }
+
+  private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
+    try {
+      setSessionOption(SLICE_TARGET, "1");
+      setSessionOption(HASHAGG.getOptionName(), "false");
+
+      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+      assertCancelled(control, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+    }
+  }
+
+  /**
+   * Tests interrupting the fragment thread that is running {@link PartitionSenderRootExec}.
+   * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
+   * the partitioner threads.
+   */
+  @Test
+  public void testInterruptingPartitionerThreadFragment() {
+    try {
+      setSessionOption(SLICE_TARGET, "1");
+      setSessionOption(HASHAGG.getOptionName(), "true");
+      setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+
+      final String controls = "{\"injections\" : ["
+          + "{"
+          + "\"type\" : \"latch\","
+          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+          + "\"desc\" : \"partitioner-sender-latch\""
+          + "},"
+          + "{"
+          + "\"type\" : \"pause\","
+          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+          + "\"desc\" : \"wait-for-fragment-interrupt\","
+          + "\"nSkip\" : 1"
+          + "}" +
+          "]}";
+
+      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+      assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+      setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(),
+          Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
new file mode 100644
index 0000000..2807c35
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.test.DrillTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit testing for {@link TimedRunnable}.
+ */
+public class TestTimedRunnable extends DrillTest {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+
+  @Rule
+  public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
+
+  private static class TestTask extends TimedRunnable {
+    final long sleepTime; // sleep time in ms
+
+    public TestTask(final long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+
+    @Override
+    protected Void runInner() throws Exception {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        throw e;
+      }
+      return null;
+    }
+
+    @Override
+    protected IOException convertToIOException(Exception e) {
+      return new IOException("Failure while trying to sleep for sometime", e);
+    }
+  }
+
+  @Test
+  public void withoutAnyTasksTriggeringTimeout() throws Exception {
+    List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+    for(int i=0; i<100; i++){
+      tasks.add(new TestTask(2000));
+    }
+
+    TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16);
+  }
+
+  @Test
+  public void withTasksExceedingTimeout() throws Exception {
+    UserException ex = null;
+
+    try {
+      List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+      for (int i = 0; i < 100; i++) {
+        if ((i & (i + 1)) == 0) {
+          tasks.add(new TestTask(2000));
+        } else {
+          tasks.add(new TestTask(20000));
+        }
+      }
+
+      TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+    } catch (UserException e) {
+      ex = e;
+    }
+
+    assertNotNull("Expected a UserException", ex);
+    assertThat(ex.getMessage(),
+        containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
+            "complete. Total runnable size 100, parallelism 16."));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 8ef2af3..2fe6c28 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -156,6 +156,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       // calling some wait method?
       resultsListener.latch.await();
     } catch ( InterruptedException e ) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
       // Not normally expected--Drill doesn't interrupt in this area (right?)--
       // but JDBC client certainly could.
       throw new SQLException( "Interrupted", e );