You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:55 UTC

[10/12] drill git commit: DRILL-2755: Use and handle InterruptedException during query processing.

DRILL-2755: Use and handle InterruptedException during query processing.

- Interrupt FragmentExecutor thread as part of FragmentExecutor.cancel()
- Handle InterruptedException in ExternalSortBatch.newSV2(). If the fragment status says
  should not continue, then throw the InterruptedException to caller which returns IterOutcome.STOP
- Add comments reg not handling of InterruptedException in SendingAccountor.waitForSendComplete()
- Handle InterruptedException in OrderedPartitionRecordBatch.getPartitionVectors()
  If interrupted in Thread.sleep calls and fragment status says should not run, then
  return IterOutcome.STOP downstream.
- Interrupt partitioner threads if PartitionerRecordBatch is interrupted while waiting for
  partitioner threads to complete.
- Preserve interrupt status if not handled
- Handle null RecordBatches returned by RawBatchBuffer.getNext() in MergingRecordBatch.buildSchema()
- Change timeout in Foreman to be proportional to the number of intermediate fragments sent instead
  of hard coded limit of 90s.
- Change TimedRunnable to enforce a timeout of 15s per runnable.
  Total timeout is (5s * numOfRunnableTasks) / parallelism.
- Add unit tests
  * Testing cancelling a query interrupts the query fragments which are currently blocked
  * Testing interrupting the partitioner sender which in turn interrupts its helper threads
  * Testing TimedRunanble enforeces timeout for the whole task list.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d5476995
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d5476995
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d5476995

Branch: refs/heads/merge_2015_05_09
Commit: d5476995d77192cf60dec9925db5597bafe9d751
Parents: 52dcd7e
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 6 09:34:25 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:29:58 2015 -0700

----------------------------------------------------------------------
 .../drill/common/concurrent/ExtendedLatch.java  |  18 +--
 .../apache/drill/exec/ops/SendingAccountor.java |   8 ++
 .../impl/mergereceiver/MergingRecordBatch.java  |  19 +++
 .../OrderedPartitionRecordBatch.java            |  31 ++++-
 .../partitionsender/PartitionerDecorator.java   |  70 +++++++++--
 .../partitionsender/PartitionerTemplate.java    |   3 +-
 .../UnorderedReceiverBatch.java                 |  22 +++-
 .../physical/impl/xsort/ExternalSortBatch.java  |  10 +-
 .../exec/record/RawFragmentBatchProvider.java   |   2 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |   9 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |   7 +-
 .../drill/exec/rpc/ReconnectingConnection.java  |   7 +-
 .../apache/drill/exec/rpc/RemoteConnection.java |  18 ++-
 .../apache/drill/exec/rpc/data/DataTunnel.java  |  14 ++-
 .../org/apache/drill/exec/server/Drillbit.java  |   4 +
 .../apache/drill/exec/store/TimedRunnable.java  |  52 +++++++--
 .../exec/testing/CountDownLatchInjection.java   |   5 +
 .../testing/CountDownLatchInjectionImpl.java    |   5 +
 .../exec/testing/ExecutionControlsInjector.java |  28 +++++
 .../exec/testing/NoOpControlsInjector.java      |   4 +
 .../drill/exec/testing/PauseInjection.java      |   7 ++
 .../org/apache/drill/exec/work/WorkManager.java |  24 ++--
 .../exec/work/batch/SpoolingRawBatchBuffer.java |   9 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |   8 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  14 ++-
 .../exec/work/fragment/FragmentExecutor.java    |  14 +++
 .../exec/server/TestDrillbitResilience.java     | 117 ++++++++++++++++++-
 .../drill/exec/store/TestTimedRunnable.java     | 103 ++++++++++++++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     |   4 +
 29 files changed, 540 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index a75ac32..6bbc476 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -35,16 +35,6 @@ public class ExtendedLatch extends CountDownLatch {
   }
 
   /**
-   * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
-   * state variable or similar.
-   *
-   * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
-   */
-  protected boolean ignoreInterruptions() {
-    return true;
-  }
-
-  /**
    * Await without interruption for a given time.
    * @param waitMillis
    *          Time in milliseconds to wait
@@ -77,12 +67,8 @@ public class ExtendedLatch extends CountDownLatch {
         await();
         return;
       } catch (final InterruptedException e) {
-        if (ignoreInterruptions()) {
-          // if we're still not ready, the while loop will cause us to wait again
-          logger.warn("Interrupted while waiting for event latch.", e);
-        } else {
-          return;
-        }
+        // if we're still not ready, the while loop will cause us to wait again
+        logger.warn("Interrupted while waiting for event latch.", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index 0cb5fbf..60ab06c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -47,7 +47,15 @@ class SendingAccountor {
           wait.acquire(waitForBatches);
           waitForBatches = batchesSent.addAndGet(-1 * waitForBatches);
         } catch (InterruptedException e) {
+          // We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
+          // to send. This should be safe because: network connections should get disconnected and fail a send if a
+          // node goes down, otherwise, the receiving side connection should always consume from the rpc layer
+          // (blocking is cooperative and will get triggered before this)
           logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);
+
+          // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+          // interruption and respond to it if it wants to.
+          Thread.currentThread().interrupt();
         }
       }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f19f371..5d990f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -70,6 +70,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
@@ -88,6 +89,7 @@ import com.sun.codemodel.JExpr;
  */
 public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(MergingRecordBatch.class);
 
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
@@ -141,6 +143,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     stats.startWait();
     final RawFragmentBatchProvider provider = fragProviders[providerIndex];
     try {
+      injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
       final RawFragmentBatch b = provider.getNext();
       if (b != null) {
         stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
@@ -148,6 +151,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
       }
       return b;
+    } catch(final InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
     } finally {
       stats.stopWait();
     }
@@ -359,6 +368,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
             nextBatch = getNext(node.batchId);
           }
+
           assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
               : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
           if (nextBatch == null && !context.shouldContinue()) {
@@ -461,6 +471,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           return;
         }
         final RawFragmentBatch batch = getNext(i);
+        if (batch == null) {
+          if (!context.shouldContinue()) {
+            state = BatchState.STOP;
+          } else {
+            state = BatchState.DONE;
+          }
+
+          break;
+        }
         if (batch.getHeader().getDef().getFieldCount() == 0) {
           i++;
           continue;

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 63b7eba..ca6d83c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -246,6 +246,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   }
 
   /**
+   * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
+   * @param timeout Timeout in milliseconds.
+   * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+   */
+  private boolean waitUntilTimeOut(final long timeout) {
+    while(true) {
+      try {
+        Thread.sleep(timeout);
+        return true;
+      } catch (final InterruptedException e) {
+        if (!context.shouldContinue()) {
+          return false;
+        }
+      }
+    }
+  }
+
+  /**
    * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
    * that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
    * distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
@@ -255,10 +273,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * @return True is successful. False if failed.
    */
   private boolean getPartitionVectors() {
-
-
     try {
-
       if (!saveSamples()) {
         return false;
       }
@@ -279,14 +294,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         // TODO: this should be polling.
 
         if (val < fragmentsBeforeProceed) {
-          Thread.sleep(10);
+          if (!waitUntilTimeOut(10)) {
+            return false;
+          }
         }
         for (int i = 0; i < 100 && finalTable == null; i++) {
           finalTable = tableMap.get(finalTableKey);
           if (finalTable != null) {
             break;
           }
-          Thread.sleep(10);
+          if (!waitUntilTimeOut(10)) {
+            return false;
+          }
         }
         if (finalTable == null) {
           buildTable();
@@ -302,7 +321,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         partitionVectors.add(w.getValueVector());
       }
 
-    } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
+    } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
       kill(false);
       context.fail(ex);
       return false;

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c3261dc..c355070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -28,6 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.testing.CountDownLatchInjection;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 /**
  * Decorator class to hide multiple Partitioner existence from the caller
@@ -38,19 +41,22 @@ import com.google.common.collect.Lists;
  * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
  */
 public class PartitionerDecorator {
-
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+  private static final ExecutionControlsInjector injector =
+      ExecutionControlsInjector.getInjector(PartitionerDecorator.class);
 
   private List<Partitioner> partitioners;
   private final OperatorStats stats;
   private final String tName;
   private final String childThreadPrefix;
   private final ExecutorService executor;
+  private final FragmentContext context;
 
 
   public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
     this.partitioners = partitioners;
     this.stats = stats;
+    this.context = context;
     this.executor = context.getDrillbitContext().getExecutor();
     this.tName = Thread.currentThread().getName();
     this.childThreadPrefix = "Partitioner-" + tName + "-";
@@ -145,17 +151,42 @@ public class PartitionerDecorator {
     stats.startWait();
     final CountDownLatch latch = new CountDownLatch(partitioners.size());
     final List<CustomRunnable> runnables = Lists.newArrayList();
+    final List<Future> taskFutures = Lists.newArrayList();
+    CountDownLatchInjection testCountDownLatch = null;
     try {
-      int i = 0;
-      for (final Partitioner part : partitioners ) {
-        runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part));
-        executor.submit(runnables.get(i++));
+      // To simulate interruption of main fragment thread and interrupting the partition threads, create a
+      // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
+      // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
+      testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
+      testCountDownLatch.initialize(1);
+      for (final Partitioner part : partitioners) {
+        final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
+        runnables.add(runnable);
+        taskFutures.add(executor.submit(runnable));
       }
-      try {
-        latch.await();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+
+      while (true) {
+        try {
+          // Wait for main fragment interruption.
+          injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
+
+          // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
+          injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();
+
+          latch.await();
+          break;
+        } catch (final InterruptedException e) {
+          // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
+          if (!context.shouldContinue()) {
+            for(Future f : taskFutures) {
+              f.cancel(true);
+            }
+
+            break;
+          }
+        }
       }
+
       IOException excep = null;
       for (final CustomRunnable runnable : runnables ) {
         IOException myException = runnable.getException();
@@ -180,8 +211,12 @@ public class PartitionerDecorator {
       // scale down main stats wait time based on calculated processing time
       // since we did not wait for whole duration of above execution
       stats.adjustWaitNanos(-maxProcessTime);
-    }
 
+      // Done with the latch, close it.
+      if (testCountDownLatch != null) {
+        testCountDownLatch.close();
+      }
+    }
   }
 
   /**
@@ -190,7 +225,7 @@ public class PartitionerDecorator {
    * protected is for testing purposes
    */
   protected interface GeneralExecuteIface {
-    public void execute(Partitioner partitioner) throws IOException;
+    void execute(Partitioner partitioner) throws IOException;
   }
 
   /**
@@ -242,17 +277,28 @@ public class PartitionerDecorator {
     private final CountDownLatch latch;
     private final GeneralExecuteIface iface;
     private final Partitioner part;
+    private CountDownLatchInjection testCountDownLatch;
+
     private volatile IOException exp;
 
-    public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) {
+    public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
+        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
       this.parentThreadName = parentThreadName;
       this.latch = latch;
       this.iface = iface;
       this.part = part;
+      this.testCountDownLatch = testCountDownLatch;
     }
 
     @Override
     public void run() {
+      // Test only - Pause until interrupted by fragment thread
+      try {
+        testCountDownLatch.await();
+      } catch (final InterruptedException e) {
+        logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
+      }
+
       final Thread currThread = Thread.currentThread();
       final String currThreadName = currThread.getName();
       final OperatorStats localStats = part.getStats();

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index cbea267..aeac01d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -285,7 +285,8 @@ public abstract class PartitionerTemplate implements Partitioner {
       //      to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
       //      sender has acknowledged the terminate request. After sending the last batch, all further batches are
       //      dropped.
-      final boolean isLastBatch = isLast || terminated;
+      //   3. Partitioner thread is interrupted due to cancellation of fragment.
+      final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();
 
       // if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
       if (!isLastBatch && recordCount == 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 66a2092..e40fe54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -49,9 +49,12 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class UnorderedReceiverBatch implements CloseableRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+  private final static ExecutionControlsInjector injector =
+      ExecutionControlsInjector.getInjector(UnorderedReceiverBatch.class);
 
   private final RecordBatchLoader batchLoader;
   private final RawFragmentBatchProvider fragProvider;
@@ -133,6 +136,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
+  private RawFragmentBatch getNextBatch() throws IOException {
+    try {
+      injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
+      return fragProvider.getNext();
+    } catch(final InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
+    }
+  }
+
   @Override
   public IterOutcome next() {
     stats.startProcessing();
@@ -140,11 +156,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       RawFragmentBatch batch;
       try {
         stats.startWait();
-        batch = fragProvider.getNext();
+        batch = getNextBatch();
 
         // skip over empty batches. we do this since these are basically control messages.
         while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
-          batch = fragProvider.getNext();
+          batch = getNextBatch();
         }
       } finally {
         stats.stopWait();

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index aab3391..3159811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -277,6 +277,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           } else {
             try {
               sv2 = newSV2();
+            } catch(InterruptedException e) {
+              return IterOutcome.STOP;
             } catch (OutOfMemoryException e) {
               throw new OutOfMemoryRuntimeException(e);
             }
@@ -496,7 +498,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return size;
   }
 
-  private SelectionVector2 newSV2() throws OutOfMemoryException {
+  private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
     SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
     if (!sv2.allocateNew(incoming.getRecordCount())) {
       try {
@@ -509,8 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       while (true) {
         try {
           Thread.sleep(waitTime * 1000);
-        } catch (InterruptedException e) {
-          throw new OutOfMemoryException(e);
+        } catch(final InterruptedException e) {
+          if (!context.shouldContinue()) {
+            throw e;
+          }
         }
         waitTime *= 2;
         if (sv2.allocateNew(incoming.getRecordCount())) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index d4dfe96..030785c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 
 public interface RawFragmentBatchProvider {
 
-  public RawFragmentBatch getNext() throws IOException;
+  public RawFragmentBatch getNext() throws IOException, InterruptedException;
   public void kill(FragmentContext context);
   public void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 1661f81..d551173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -263,9 +263,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     logger.debug("Closing client");
     try {
       connection.getChannel().close().get();
-    } catch (InterruptedException | ExecutionException e) {
-      logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
-      // TODO InterruptedException
+    } catch (final InterruptedException | ExecutionException e) {
+      logger.warn("Failure while shutting {}", this.getClass().getName(), e);
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a148436..6a7bc65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -204,9 +204,12 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   public void close() throws IOException {
     try {
       eventLoopGroup.shutdownGracefully().get();
-    } catch (InterruptedException | ExecutionException e) {
+    } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index 9948d3e..f0787a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -112,9 +112,12 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
           cmd.connectionSucceeded(connection);
 //          logger.debug("Finished connection succeeded activity.");
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         cmd.connectionFailed(FailureType.CONNECTION, e);
-        // TODO InterruptedException
+
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
       } catch (ExecutionException e) {
         throw new IllegalStateException();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0f095c0..2ee9263 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -70,9 +70,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
     try{
       writeManager.waitForWritable();
       return true;
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       listener.failed(new RpcException(e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
       return false;
     }
   }
@@ -131,10 +135,14 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
       if (channel.isActive()) {
         channel.close().get();
       }
-    } catch (InterruptedException | ExecutionException e) {
+      channel.close().get();
+    } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 11f5496..ed31bed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -48,9 +48,12 @@ public class DataTunnel {
     try{
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -59,9 +62,12 @@ public class DataTunnel {
     try{
       sendingSemaphore.acquire();
       manager.runCommand(b);
-    }catch(InterruptedException e){
+    }catch(final InterruptedException e){
       b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
-      // TODO InterruptedException
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
     return b.getFuture();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index e7a9a3c..531253e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -271,6 +271,10 @@ public class Drillbit implements AutoCloseable {
       Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
     } catch (final InterruptedException e) {
       logger.warn("Interrupted while sleeping during coordination deregistration.");
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
 
     if (embeddedJetty != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 0fb778b..e52820b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.store;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.UserException;
 import org.slf4j.Logger;
 
 import com.google.common.base.Stopwatch;
@@ -36,6 +41,8 @@ import com.google.common.collect.Lists;
  */
 public abstract class TimedRunnable<V> implements Runnable {
 
+  private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+
   private volatile Exception e;
   private volatile long timeNanos;
   private volatile V value;
@@ -91,10 +98,13 @@ public abstract class TimedRunnable<V> implements Runnable {
   }
 
   /**
-   * Execute the list of runnables with the given parallelization.  At end, return values and report completion time stats to provided logger.
+   * Execute the list of runnables with the given parallelization.  At end, return values and report completion time
+   * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
+   * tasks will be cancelled and a {@link UserException} is thrown.
    * @param activity Name of activity for reporting in logger.
    * @param logger The logger to use to report results.
-   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be completed in-thread.
+   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be
+   *                  completed in-thread. Runnable must handle {@link InterruptedException}s.
    * @param parallelism  The number of threads that should be run to complete this task.
    * @return The list of outcome objects.
    * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
@@ -107,25 +117,43 @@ public abstract class TimedRunnable<V> implements Runnable {
       runnables.get(0).run();
     }else{
       parallelism = Math.min(parallelism,  runnables.size());
-      final CountDownLatch latch = new CountDownLatch(runnables.size());
+      final ExtendedLatch latch = new ExtendedLatch(runnables.size());
       final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
       try{
         for(TimedRunnable<V> runnable : runnables){
           threadPool.submit(new LatchedRunnable(latch, runnable));
         }
-      }finally{
-        threadPool.shutdown();
-      }
 
-      try{
-        latch.await();
-      }catch(InterruptedException e){
-        // TODO interrupted exception.
-        throw new RuntimeException(e);
+        final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
+        if (!latch.awaitUninterruptibly(timeout)) {
+          // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
+          // It is highly important that the task Runnables are handling interrupts correctly.
+          threadPool.shutdownNow();
+
+          try {
+            // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+            // any running threads. If the runnables are handling the interrupts properly they should be able to
+            // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+            // thread leaks.
+            threadPool.awaitTermination(5, TimeUnit.SECONDS);
+          } catch (final InterruptedException e) {
+            logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
+          }
+
+          final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
+              "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
+          logger.error(errMsg);
+          throw UserException.resourceError()
+              .message(errMsg)
+              .build();
+        }
+      } finally {
+        if (!threadPool.isShutdown()) {
+          threadPool.shutdown();
+        }
       }
     }
 
-
     List<V> values = Lists.newArrayList();
     long sum = 0;
     long max = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index de4a181..d26e2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -48,4 +48,9 @@ public interface CountDownLatchInjection {
    * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
    */
   void countDown();
+
+  /**
+   * Close the latch.
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index f4012c1..561d816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -82,4 +82,9 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
     Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
     latch.countDown();
   }
+
+  @Override
+  public void close() {
+    latch = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 05f8433..387d300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -124,6 +124,34 @@ public class ExecutionControlsInjector {
     }
   }
 
+  /**
+   * Insert a pause that can be interrupted using {@link Thread#interrupt()} at the given site point, if such an
+   * injection is specified (i.e. matches the site description).
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate a interruptible pause
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param logger            logger of the class containing the injection site
+   * @throws InterruptedException if interrupted using {@link Thread#interrupt()}
+   */
+  public void injectInterruptiblePause(final ExecutionControls executionControls, final String desc,
+      final Logger logger) throws InterruptedException {
+    final PauseInjection pauseInjection = executionControls.lookupPauseInjection(this, desc);
+
+    if (pauseInjection != null) {
+      logger.debug("Interruptible pausing at {}", desc);
+      try {
+        pauseInjection.interruptiblePause();
+      } catch (final InterruptedException e) {
+        logger.debug("Pause interrupted at {}", desc);
+        throw e;
+      }
+      logger.debug("Interruptible pause resuming at {}", desc);
+    }
+  }
+
   public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
     return executionControls.lookupCountDownLatchInjection(this, desc);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 33ab783..bb13d1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -61,6 +61,10 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
     @Override
     public void countDown() {
     }
+
+    @Override
+    public void close() {
+    }
   };
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index ff0340b..fc4d8ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -54,6 +54,13 @@ public class PauseInjection extends Injection {
     latch.awaitUninterruptibly();
   }
 
+  public void interruptiblePause() throws InterruptedException {
+    if (!injectNow()) {
+      return;
+    }
+    latch.await();
+  }
+
   public void unpause() {
     latch.countDown();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index f2352e6..1d3a0b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,13 +21,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.SelfCleaningRunnable;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -173,6 +173,10 @@ public class WorkManager implements AutoCloseable {
       }
     } catch (final InterruptedException e) {
       logger.warn("Executor interrupted while awaiting termination");
+
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -180,7 +184,7 @@ public class WorkManager implements AutoCloseable {
     return dContext;
   }
 
-  private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+  private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running
 
   /**
    * Waits until it is safe to exit. Blocks until all currently running fragments have completed.
@@ -193,17 +197,11 @@ public class WorkManager implements AutoCloseable {
         return;
       }
 
-      exitLatch = new CountDownLatch(1);
+      exitLatch = new ExtendedLatch();
     }
 
-    while(true) {
-      try {
-        exitLatch.await(5, TimeUnit.SECONDS);
-      } catch(final InterruptedException e) {
-        // keep waiting
-      }
-      break;
-    }
+    // Wait for at most 5 seconds or until the latch is released.
+    exitLatch.awaitUninterruptibly(5000);
   }
 
   /**
@@ -328,6 +326,10 @@ public class WorkManager implements AutoCloseable {
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
         } catch(final InterruptedException e) {
+          // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+          // interruption and respond to it if it wants to.
+          Thread.currentThread().interrupt();
+
           // exit status thread on interrupt.
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 2a79e42..07a3505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -141,7 +141,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
   }
 
   @Override
-  public RawFragmentBatch getNext() throws IOException {
+  public RawFragmentBatch getNext() throws IOException, InterruptedException {
     if (outOfMemory && buffer.size() < 10) {
       outOfMemory = false;
       fragmentManager.setAutoRead(true);
@@ -160,9 +160,12 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
         }
         queueSize -= w.getBodySize();
         return batch;
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
+        // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+        // interruption and respond to it if it wants to.
+        Thread.currentThread().interrupt();
+
         return null;
-        // TODO InterruptedException
       }
     }
     if (w == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index d23655c..4750666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
 
   private static enum BufferState {
     INIT,
@@ -149,7 +149,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public RawFragmentBatch getNext() {
+  public RawFragmentBatch getNext() throws IOException, InterruptedException {
 
     if (outOfMemory.get() && buffer.size() < 10) {
       logger.trace("Setting autoread true");
@@ -166,8 +166,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       try {
         b = buffer.take();
       } catch (final InterruptedException e) {
-        return null;
-        // TODO InterruptedException
+        logger.debug("Interrupted while waiting for incoming data.", e);
+        throw e;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0122ef8..bf62ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
   private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
-  private static final int RPC_WAIT_IN_SECONDS = 90;
+  private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
 
   private final QueryId queryId;
   private final RunQuery queryRequest;
@@ -967,7 +967,8 @@ public class Foreman implements Runnable {
      * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
      * know if any submissions did fail.
      */
-    final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
+    final int numIntFragments = intFragmentMap.keySet().size();
+    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
@@ -975,16 +976,17 @@ public class Foreman implements Runnable {
       sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
-    if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+    if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
       long numberRemaining = endpointLatch.getCount();
       throw UserException.connectionError()
           .message(
-              "Exceeded timeout while waiting send intermediate work fragments to remote nodes.  Sent %d and only heard response back from %d nodes.",
-              intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+              "Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+                  "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - numberRemaining)
           .build();
     }
 
-
     // if any of the intermediate fragment submissions failed, fail the query
     final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
     if (submissionExceptions.size() > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 24e2556..d96e6d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -62,6 +62,9 @@ public class FragmentExecutor implements Runnable {
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
   private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
 
+  // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
+  private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
+
   public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
                           final StatusReporter listener) {
     this.fragmentContext = context;
@@ -136,6 +139,14 @@ public class FragmentExecutor implements Runnable {
        * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
        */
       updateState(FragmentState.CANCELLATION_REQUESTED);
+
+      /*
+       * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+       */
+      final Thread myThread = myThreadRef.get();
+      if (myThread != null) {
+        myThread.interrupt();
+      }
     }
   }
 
@@ -168,6 +179,7 @@ public class FragmentExecutor implements Runnable {
   @Override
   public void run() {
     final Thread myThread = Thread.currentThread();
+    myThreadRef.set(myThread);
     final String originalThreadName = myThread.getName();
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
     final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
@@ -244,6 +256,8 @@ public class FragmentExecutor implements Runnable {
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
       myThread.setName(originalThreadName);
+
+      myThreadRef.set(null);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 3e4dcb2..d72d498 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.server;
 
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -43,6 +47,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -437,6 +445,20 @@ public class TestDrillbitResilience {
     }
   }
 
+  private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+    private boolean cancelRequested = false;
+
+    @Override
+    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+      if (!cancelRequested) {
+        check(queryId != null, "Query id should not be null, since we have waited long enough.");
+        (new CancellingThread(queryId, ex, null)).start();
+        cancelRequested = true;
+      }
+      result.release();
+    }
+  };
+
   /**
    * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
    */
@@ -459,7 +481,9 @@ public class TestDrillbitResilience {
       } catch (final RpcException ex) {
         this.ex.value = ex;
       }
-      latch.countDown();
+      if (latch != null) {
+        latch.countDown();
+      }
     }
   }
 
@@ -507,13 +531,33 @@ public class TestDrillbitResilience {
    * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
    */
   private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    assertCancelled(controls, TEST_QUERY, listener);
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
+   */
+  private static void assertCancelled(final String controls, final String testQuery,
+      final WaitUntilCompleteListener listener) {
     setControls(controls);
 
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, testQuery, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     assertCompleteState(result, QueryState.CANCELED);
   }
 
+  private static void setSessionOption(final String option, final String value) {
+    try {
+      final List<QueryDataBatch> results = drillClient.runQuery(QueryType.SQL,
+          String.format("alter session set `%s` = %s", option, value));
+      for (final QueryDataBatch data : results) {
+        data.release();
+      }
+    } catch(RpcException e) {
+      fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString()));
+    }
+  }
+
   private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
     return "{\"injections\" : [{"
       + "\"type\" : \"pause\"," +
@@ -667,4 +711,73 @@ public class TestDrillbitResilience {
     final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
   }
+
+  /**
+   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
+   */
+  @Test
+  public void testInterruptingBlockedMergingRecordBatch() {
+    final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
+    testInterruptingBlockedFragmentsWaitingForData(control);
+  }
+
+  /**
+   * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+   * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
+   */
+  @Test
+  public void testInterruptingBlockedUnorderedReceiverBatch() {
+    final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
+    testInterruptingBlockedFragmentsWaitingForData(control);
+  }
+
+  private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
+    try {
+      setSessionOption(SLICE_TARGET, "1");
+      setSessionOption(HASHAGG.getOptionName(), "false");
+
+      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+      assertCancelled(control, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+    }
+  }
+
+  /**
+   * Tests interrupting the fragment thread that is running {@link PartitionSenderRootExec}.
+   * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
+   * the partitioner threads.
+   */
+  @Test
+  public void testInterruptingPartitionerThreadFragment() {
+    try {
+      setSessionOption(SLICE_TARGET, "1");
+      setSessionOption(HASHAGG.getOptionName(), "true");
+      setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+
+      final String controls = "{\"injections\" : ["
+          + "{"
+          + "\"type\" : \"latch\","
+          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+          + "\"desc\" : \"partitioner-sender-latch\""
+          + "},"
+          + "{"
+          + "\"type\" : \"pause\","
+          + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+          + "\"desc\" : \"wait-for-fragment-interrupt\","
+          + "\"nSkip\" : 1"
+          + "}" +
+          "]}";
+
+      final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+      assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+      setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(),
+          Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
new file mode 100644
index 0000000..2807c35
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.test.DrillTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit testing for {@link TimedRunnable}.
+ */
+public class TestTimedRunnable extends DrillTest {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+
+  @Rule
+  public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
+
+  private static class TestTask extends TimedRunnable {
+    final long sleepTime; // sleep time in ms
+
+    public TestTask(final long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+
+    @Override
+    protected Void runInner() throws Exception {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        throw e;
+      }
+      return null;
+    }
+
+    @Override
+    protected IOException convertToIOException(Exception e) {
+      return new IOException("Failure while trying to sleep for sometime", e);
+    }
+  }
+
+  @Test
+  public void withoutAnyTasksTriggeringTimeout() throws Exception {
+    List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+    for(int i=0; i<100; i++){
+      tasks.add(new TestTask(2000));
+    }
+
+    TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16);
+  }
+
+  @Test
+  public void withTasksExceedingTimeout() throws Exception {
+    UserException ex = null;
+
+    try {
+      List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+      for (int i = 0; i < 100; i++) {
+        if ((i & (i + 1)) == 0) {
+          tasks.add(new TestTask(2000));
+        } else {
+          tasks.add(new TestTask(20000));
+        }
+      }
+
+      TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+    } catch (UserException e) {
+      ex = e;
+    }
+
+    assertNotNull("Expected a UserException", ex);
+    assertThat(ex.getMessage(),
+        containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
+            "complete. Total runnable size 100, parallelism 16."));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d5476995/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 8ef2af3..2fe6c28 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -156,6 +156,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
       // calling some wait method?
       resultsListener.latch.await();
     } catch ( InterruptedException e ) {
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
       // Not normally expected--Drill doesn't interrupt in this area (right?)--
       // but JDBC client certainly could.
       throw new SQLException( "Interrupted", e );