You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/13 04:42:50 UTC

[1/6] drill git commit: DRILL-3051: Fix integer overflow in TimedRunnable

Repository: drill
Updated Branches:
  refs/heads/master 6d7cda8ea -> 83d8ebe60


DRILL-3051: Fix integer overflow in TimedRunnable

also fixes a similar overflow in Foreman


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83d8ebe6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83d8ebe6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83d8ebe6

Branch: refs/heads/master
Commit: 83d8ebe60d9e5e8c5b842dff223f87739aea0a6c
Parents: b3d097b
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 18:19:49 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/store/TimedRunnable.java |  2 +-
 .../org/apache/drill/exec/work/foreman/Foreman.java     |  2 +-
 .../org/apache/drill/exec/store/TestTimedRunnable.java  | 12 ++++++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index e52820b..240aaef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Lists;
  */
 public abstract class TimedRunnable<V> implements Runnable {
 
-  private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
 
   private volatile Exception e;
   private volatile long timeNanos;

http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index af3309f..cdf1276 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
   private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
-  private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
 
   private final QueryId queryId;
   private final RunQuery queryRequest;

http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
index 2807c35..44bb8b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -100,4 +100,16 @@ public class TestTimedRunnable extends DrillTest {
         containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
             "complete. Total runnable size 100, parallelism 16."));
   }
+
+  @Test
+  public void withManyTasks() throws Exception {
+
+    List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+    for (int i = 0; i < 150000; i++) {
+      tasks.add(new TestTask(0));
+    }
+
+    TimedRunnable.run("Execution with lots of tasks", logger, tasks, 16);
+  }
 }


[6/6] drill git commit: DRILL-3048: Disable assertions by default in distribution

Posted by sm...@apache.org.
DRILL-3048: Disable assertions by default in distribution


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/20b36885
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/20b36885
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/20b36885

Branch: refs/heads/master
Commit: 20b36885c12b93af1fcfeda81ac35eada8436903
Parents: 3616310
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 17:49:12 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-env.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/20b36885/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 345938e..53a3cb7 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -16,7 +16,7 @@
 DRILL_MAX_DIRECT_MEMORY="8G"
 DRILL_HEAP="4G"
 
-export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -Ddrill.exec.enable-epoll=true -ea"
+export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -Ddrill.exec.enable-epoll=true"
 
 # Class unloading is disabled by default in Java 7
 # http://hg.openjdk.java.net/jdk7u/jdk7u60/hotspot/file/tip/src/share/vm/runtime/globals.hpp#l1622


[2/6] drill git commit: DRILL-3049: Increase sort spooling threshold

Posted by sm...@apache.org.
DRILL-3049: Increase sort spooling threshold


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/01a36f1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/01a36f1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/01a36f1e

Branch: refs/heads/master
Commit: 01a36f1e81fee08a5f7af204ac8cbdbf8d74a0b2
Parents: 20b3688
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 17:55:54 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 exec/java-exec/src/main/resources/drill-module.conf | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/01a36f1e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b1e3fde..7630938 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -136,13 +136,13 @@ drill.exec: {
     cache_max_size: 1000
   },
   sort: {
-    purge.threshold : 100,
+    purge.threshold : 1000,
     external: {
       batch.size : 4000,
       spill: {
         batch.size : 4000,
-        group.size : 100,
-        threshold : 200,
+        group.size : 40000,
+        threshold : 40000,
         directories : [ "/tmp/drill/spill" ],
         fs : "file:///"
       }


[5/6] drill git commit: DRILL-3050: Increase QueryContext max allocation to 256MB

Posted by sm...@apache.org.
DRILL-3050: Increase QueryContext max allocation to 256MB


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b3d097b0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b3d097b0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b3d097b0

Branch: refs/heads/master
Commit: b3d097b03e37a57494262051a76a824a957e8605
Parents: 01a36f1
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 18:05:34 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/ops/QueryContext.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b3d097b0/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 8917a24..06f8088 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -54,7 +54,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
 
   private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
-  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+  private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 256 * 1024 * 1024;
 
   private final DrillbitContext drillbitContext;
   private final UserSession session;


[4/6] drill git commit: DRILL-2755: (part2) Use and handle InterruptedException during query processing

Posted by sm...@apache.org.
DRILL-2755: (part2) Use and handle InterruptedException during query processing


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36163105
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36163105
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36163105

Branch: refs/heads/master
Commit: 361631056ad00ab5f7df1eeed50d95f654f642a3
Parents: ed200e2
Author: vkorukanti <ve...@gmail.com>
Authored: Tue May 12 10:23:38 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  9 ++++
 .../drill/exec/ops/AccountingDataTunnel.java    | 11 ++++
 .../org/apache/drill/exec/ops/Consumer.java     |  5 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  8 +++
 .../apache/drill/exec/ops/StatusHandler.java    |  6 +++
 .../exec/physical/impl/SingleSenderCreator.java |  8 ++-
 .../impl/mergereceiver/MergingRecordBatch.java  | 10 ++++
 .../partitionsender/PartitionerDecorator.java   |  1 +
 .../UnorderedReceiverBatch.java                 | 10 ++++
 .../drill/exec/rpc/BaseRpcOutcomeListener.java  |  7 ++-
 .../org/apache/drill/exec/rpc/BasicClient.java  | 53 ++++++++++++++++----
 .../drill/exec/rpc/DrillRpcFutureImpl.java      | 17 +++----
 .../apache/drill/exec/rpc/FutureBitCommand.java |  6 +++
 .../apache/drill/exec/rpc/ListeningCommand.java |  4 ++
 .../drill/exec/rpc/ReconnectingConnection.java  | 51 ++++++++++++-------
 .../apache/drill/exec/rpc/RemoteConnection.java |  3 +-
 .../drill/exec/rpc/RpcOutcomeListener.java      | 14 +++++-
 .../apache/drill/exec/rpc/data/DataTunnel.java  | 42 +++++++++++++++-
 .../drill/exec/rpc/user/QueryResultHandler.java | 14 +++++-
 .../exec/testing/NoOpControlsInjector.java      |  5 ++
 .../apache/drill/exec/work/foreman/Foreman.java | 40 +++++++++++++--
 .../drill/exec/work/foreman/QueryManager.java   |  6 +++
 .../exec/work/fragment/FragmentExecutor.java    |  3 ++
 .../src/main/resources/drill-module.conf        |  5 +-
 .../org/apache/drill/exec/ZookeeperHelper.java  |  7 ++-
 .../apache/drill/exec/server/TestBitRpc.java    |  4 ++
 .../exec/server/TestDrillbitResilience.java     |  8 +++
 27 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1a10aa2..7bbb815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -86,6 +86,15 @@ public interface ExecConstants {
   public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
   public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+
+  /**
+   * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
+   * FINISHED we report the query result as CANCELLED by swallowing the failures occurred in fragments. This BOOT
+   * setting allows the user to see the query status as failure. Useful for developers/testers.
+   */
+  public static final String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
+      "drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
+
   /** Fragment memory planning */
   public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
   public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 2bcfdbc..2659464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -21,6 +21,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.slf4j.Logger;
 
 /**
  * Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to
@@ -41,4 +44,12 @@ public class AccountingDataTunnel {
     sendingAccountor.increment();
     tunnel.sendRecordBatch(statusHandler, batch);
   }
+
+  /**
+   * See {@link DataTunnel#setTestInjectionControls(ExecutionControlsInjector, ExecutionControls, Logger)}.
+   */
+  public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+      final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+    tunnel.setTestInjectionControls(testInjector, testControls, testLogger);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
index 9b8ba38..0a1a397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
@@ -19,5 +19,8 @@ package org.apache.drill.exec.ops;
 
 //TODO replace this when we switch to JDK8, which includes this
 public interface Consumer<T> {
-  public void accept(T t);
+
+  void accept(T t);
+
+  void interrupt(final InterruptedException e);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index cf4e9bb..1cbe886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -90,6 +90,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     public void accept(final RpcException e) {
       fail(e);
     }
+
+    @Override
+    public void interrupt(final InterruptedException e) {
+      if (shouldContinue()) {
+        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
+        fail(e);
+      }
+    }
   };
 
   private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
index 79fc0b0..66e35b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
@@ -53,4 +53,10 @@ public class StatusHandler implements RpcOutcomeListener<Ack> {
     // if we didn't get ack ok, we'll need to kill the query.
     consumer.accept(new RpcException("Data not accepted downstream."));
   }
+
+  @Override
+  public void interrupted(final InterruptedException e) {
+    sendingAccountor.decrement();
+    consumer.interrupt(e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index fe6239e..1f6767c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
 
@@ -41,8 +42,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     return new SingleSenderRootExec(context, children.iterator().next(), config);
   }
 
-  private static class SingleSenderRootExec extends BaseRootExec {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+  public static class SingleSenderRootExec extends BaseRootExec {
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+    private static final ExecutionControlsInjector injector =
+        ExecutionControlsInjector.getInjector(SingleSenderRootExec.class);
 
     private final FragmentHandle oppositeHandle;
 
@@ -76,6 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
           .setMinorFragmentId(config.getOppositeMinorFragmentId())
           .build();
       tunnel = context.getDataTunnel(config.getDestination());
+      tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 611052b..6da132b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -534,6 +534,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
+  // TODO: Code duplication. UnorderedReceiverBatch has the same implementation.
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
@@ -545,6 +546,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      if (context.shouldContinue()) {
+        final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+        logger.error(errMsg, e);
+        context.fail(new RpcException(errMsg, e));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c355070..e210514 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -178,6 +178,7 @@ public class PartitionerDecorator {
         } catch (final InterruptedException e) {
           // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
           if (!context.shouldContinue()) {
+            logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
             for(Future f : taskFutures) {
               f.cancel(true);
             }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index e40fe54..1498441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -240,6 +240,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     }
   }
 
+  // TODO: Code duplication. MergingRecordBatch has the same implementation.
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
@@ -251,6 +252,15 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      if (context.shouldContinue()) {
+        final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+        logger.error(errMsg, e);
+        context.fail(new RpcException(errMsg, e));
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 9b071ad..dfc62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -32,5 +32,10 @@ public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
   public void success(T value, ByteBuf buffer) {
   }
 
-
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void interrupted(final InterruptedException ex) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index d551173..34758ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -194,19 +194,45 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
 
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
+        boolean isInterrupted = false;
+
+        final long timeoutMills = 30000;
+
+        // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+        // So there is no point propagating the interruption as failure immediately.
+        final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
         // logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
-        try {
-          future.get();
-          if (future.isSuccess()) {
-            // send a handshake on the current thread. This is the only time we will send from within the event thread.
-            // We can do this because the connection will not be backed up.
-            send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
-          } else {
-            l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+        while(true) {
+          try {
+            future.get(timeoutMills, TimeUnit.MILLISECONDS);
+            if (future.isSuccess()) {
+              // send a handshake on the current thread. This is the only time we will send from within the event thread.
+              // We can do this because the connection will not be backed up.
+              send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
+            } else {
+              l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+            }
+            // logger.debug("Handshake queued for send.");
+            break;
+          } catch (final InterruptedException interruptEx) {
+            // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+            isInterrupted = true;
+            final long wait = targetMillis - System.currentTimeMillis();
+            if (wait < 1) {
+              l.connectionFailed(FailureType.CONNECTION, interruptEx);
+              break;
+            }
+          } catch (final Exception ex) {
+            l.connectionFailed(FailureType.CONNECTION, ex);
+            break;
           }
-          // logger.debug("Handshake queued for send.");
-        } catch (Exception ex) {
-          l.connectionFailed(FailureType.CONNECTION, ex);
+        }
+
+        if (isInterrupted) {
+          // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+          // interruption and respond to it if it wants to.
+          Thread.currentThread().interrupt();
         }
       }
     }
@@ -235,6 +261,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         }
       }
 
+      @Override
+      public void interrupted(final InterruptedException ex) {
+        logger.warn("Interrupted while waiting for handshake response", ex);
+        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 19d9c30..cbe63c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -31,17 +31,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
     super(new InnerFuture<V>());
   }
 
-  /**
-   * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
-   * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
-   * will result in an UnsupportedOperationException.
-   */
-  @Override
-  public boolean cancel(boolean mayInterruptIfRunning) {
-    throw new UnsupportedOperationException(
-        "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
-  }
-
   @Override
   protected RpcException mapException(Exception ex) {
     return RpcException.mapException(ex);
@@ -72,6 +61,12 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
   }
 
   @Override
+  public void interrupted(final InterruptedException ex) {
+    // Propagate the interrupt to inner future
+    ( (InnerFuture<V>)delegate()).cancel(true);
+  }
+
+  @Override
   public ByteBuf getBuffer() {
     return buffer;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 6c7bf3e..53d0772 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -58,6 +58,12 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteCo
       settableFuture.set(value);
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // If we are interrupted while performing the command, consider as failure.
+      logger.warn("Interrupted while running the command", e);
+      failed(new RpcException(e));
+    }
   }
 
   public DrillRpcFuture<T> getFuture() {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index e32ca8a..92022b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -55,6 +55,10 @@ public abstract class ListeningCommand<T extends MessageLite, C extends RemoteCo
       listener.success(value, buf);
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      listener.interrupted(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index f0787a5..12e0063 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -22,6 +22,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
@@ -100,28 +102,44 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
      * Called by
      */
     public void waitAndRun() {
-      try {
-//        logger.debug("Waiting for connection.");
-        CONNECTION_TYPE connection = this.get();
-
-        if (connection == null) {
-//          logger.debug("Connection failed.");
-          return;
-        } else {
-//          logger.debug("Connection received. {}", connection);
-          cmd.connectionSucceeded(connection);
-//          logger.debug("Finished connection succeeded activity.");
+      boolean isInterrupted = false;
+
+      final long timeoutMills = 30000;
+      // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+      // So there is no point propagating the interruption as failure immediately.
+      final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
+      while(true) {
+        try {
+          //        logger.debug("Waiting for connection.");
+          CONNECTION_TYPE connection = this.get(timeoutMills, TimeUnit.MILLISECONDS);
+
+          if (connection == null) {
+            //          logger.debug("Connection failed.");
+          } else {
+            //          logger.debug("Connection received. {}", connection);
+            cmd.connectionSucceeded(connection);
+            //          logger.debug("Finished connection succeeded activity.");
+          }
+          break;
+        } catch (final InterruptedException interruptEx) {
+          // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+          isInterrupted = true;
+          final long wait = targetMillis - System.currentTimeMillis();
+          if (wait < 1) {
+            cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
+            break;
+          }
+        } catch (final ExecutionException | TimeoutException ex) {
+          cmd.connectionFailed(FailureType.CONNECTION, ex);
         }
-      } catch (final InterruptedException e) {
-        cmd.connectionFailed(FailureType.CONNECTION, e);
+      }
 
+      if (isInterrupted) {
         // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
         // interruption and respond to it if it wants to.
         Thread.currentThread().interrupt();
-      } catch (ExecutionException e) {
-        throw new IllegalStateException();
       }
-
     }
 
     @Override
@@ -243,7 +261,6 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
     public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
       delegate.connectionFailed(type, t);
     }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 2ee9263..199569c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -71,7 +71,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
       writeManager.waitForWritable();
       return true;
     }catch(final InterruptedException e){
-      listener.failed(new RpcException(e));
+      listener.interrupted(e);
 
       // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
       // interruption and respond to it if it wants to.
@@ -135,7 +135,6 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
       if (channel.isActive()) {
         channel.close().get();
       }
-      channel.close().get();
     } catch (final InterruptedException | ExecutionException e) {
       logger.warn("Caught exception while closing channel.", e);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 7d7c860..4485cf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -21,9 +21,19 @@ import io.netty.buffer.ByteBuf;
 
 public interface RpcOutcomeListener<V> {
 
-  public void failed(RpcException ex);
-  public void success(V value, ByteBuf buffer);
+  /**
+   * Called when an error occurred while waiting for the RPC outcome.
+   * @param ex
+   */
+  void failed(RpcException ex);
 
 
+  void success(V value, ByteBuf buffer);
+
+  /**
+   * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
+   * failures.
+   */
+  void interrupted(final InterruptedException e);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index ed31bed..143020f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -31,6 +31,8 @@ import org.apache.drill.exec.rpc.ListeningCommand;
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 
 public class DataTunnel {
@@ -39,17 +41,49 @@ public class DataTunnel {
   private final DataConnectionManager manager;
   private final Semaphore sendingSemaphore = new Semaphore(3);
 
+  // Needed for injecting a test pause
+  private boolean isInjectionControlSet;
+  private ExecutionControlsInjector testInjector;
+  private ExecutionControls testControls;
+  private org.slf4j.Logger testLogger;
+
+
   public DataTunnel(DataConnectionManager manager) {
     this.manager = manager;
   }
 
+  /**
+   * Once a DataTunnel is created, clients of DataTunnel can pass injection controls to enable setting injections at
+   * pre-defined places. Currently following injection sites are available.
+   *
+   * 1. In method {@link #sendRecordBatch(RpcOutcomeListener, FragmentWritableBatch)}, an interruptible pause injection
+   *    is available before acquiring the sending slot. Site name is: "data-tunnel-send-batch-wait-for-interrupt"
+   *
+   * @param testInjector
+   * @param testControls
+   * @param testLogger
+   */
+  public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+      final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+    isInjectionControlSet = true;
+    this.testInjector = testInjector;
+    this.testControls = testControls;
+    this.testLogger = testLogger;
+  }
+
   public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
     SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
     try{
+      if (isInjectionControlSet) {
+        // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for
+        // semaphore acquire. We expect the
+        testInjector.injectInterruptiblePause(testControls, "data-tunnel-send-batch-wait-for-interrupt", testLogger);
+      }
+
       sendingSemaphore.acquire();
       manager.runCommand(b);
     }catch(final InterruptedException e){
-      outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
+      outcomeListener.interrupted(e);
 
       // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
       // interruption and respond to it if it wants to.
@@ -57,6 +91,7 @@ public class DataTunnel {
     }
   }
 
+  // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
   public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
     SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
     try{
@@ -93,6 +128,11 @@ public class DataTunnel {
       inner.success(value, buffer);
     }
 
+    @Override
+    public void interrupted(InterruptedException e) {
+      sendingSemaphore.release();
+      inner.interrupted(e);
+    }
   }
 
   private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 143d104..8443948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -349,9 +349,21 @@ public class QueryResultHandler {
           throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
         }
       }
-
     }
 
+    @Override
+    public void interrupted(final InterruptedException ex) {
+      logger.warn("Interrupted while waiting for query results from Drillbit", ex);
+
+      if (!isTerminal.compareAndSet(false, true)) {
+        return;
+      }
+
+      closeFuture.removeListener(closeListener);
+
+      // Throw an interrupted UserException?
+      resultsListener.submissionFailed(UserException.systemError(ex).build());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index bb13d1f..bf4221e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -42,6 +42,11 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
                                                final Logger logger) {
   }
 
+  @Override
+  public void injectInterruptiblePause(ExecutionControls executionControls, String desc, Logger logger)
+      throws InterruptedException {
+  }
+
   /**
    * When assertions are not enabled, this count down latch that does nothing is injected.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bf62ccb..af3309f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -610,6 +610,18 @@ public class Foreman implements Runnable {
     }
 
     /**
+     * Ignore the current status and force the given failure as current status.
+     * NOTE: Used only for testing purposes. Shouldn't be used in production.
+     */
+    public void setForceFailure(final Exception exception) {
+      Preconditions.checkArgument(exception != null);
+      Preconditions.checkState(!isClosed);
+
+      resultState = QueryState.FAILED;
+      resultException = exception;
+    }
+
+    /**
      * Add an exception to the result. All exceptions after the first become suppressed
      * exceptions hanging off the first.
      *
@@ -837,6 +849,14 @@ public class Foreman implements Runnable {
         if ((newState == QueryState.CANCELED)
             || (newState == QueryState.COMPLETED)
             || (newState == QueryState.FAILED)) {
+
+          if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+            if (newState == QueryState.FAILED) {
+              assert exception != null;
+              recordNewState(QueryState.FAILED);
+              foremanResult.setForceFailure(exception);
+            }
+          }
           /*
            * These amount to a completion of the cancellation requests' cleanup;
            * now we can clean up and send the result.
@@ -1109,6 +1129,15 @@ public class Foreman implements Runnable {
         stateListener.moveToState(QueryState.FAILED, ex);
       }
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
+      // Consider the interrupt as failure.
+      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+      logger.error(errMsg, e);
+      failed(new RpcException(errMsg, e));
+    }
   }
 
   /**
@@ -1139,10 +1168,15 @@ public class Foreman implements Runnable {
   private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
     @Override
     public void failed(final RpcException ex) {
-      logger.info(
-          "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
-          ex);
+      logger.info("Failure while trying communicate query result to initiating client. " +
+              "This would happen if a client is disconnected before response notice can be sent.", ex);
       stateListener.moveToState(QueryState.FAILED, ex);
     }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
+      stateListener.moveToState(QueryState.FAILED, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 84a38a6..eed4e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -267,6 +267,12 @@ public class QueryManager {
           ack);
       }
     }
+
+    @Override
+    public void interrupted(final InterruptedException ex) {
+      logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
+          "Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
+    }
   }
 
   QueryState updateEphemeralState(final QueryState queryState) {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2712fe7..6b44ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -145,6 +145,7 @@ public class FragmentExecutor implements Runnable {
        */
       final Thread myThread = myThreadRef.get();
       if (myThread != null) {
+        logger.debug("Interrupting fragment thread {}", myThread.getName());
         myThread.interrupt();
       }
     }
@@ -343,6 +344,8 @@ public class FragmentExecutor implements Runnable {
     case FINISHED:
       if(current == FragmentState.CANCELLATION_REQUESTED){
         target = FragmentState.CANCELLED;
+      } else if (current == FragmentState.FAILED) {
+        target = FragmentState.FAILED;
       }
       // fall-through
     case FAILED:

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index d98b97a..b1e3fde 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -159,5 +159,8 @@ drill.exec: {
       initial: 20000000
     }
   },
-  debug.error_on_leak: true
+  debug: {
+    error_on_leak: true,
+    return_error_for_failure_in_cancelled_fragments: false
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index 7fcf4cb..a5db81d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Throwables.propagate;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Properties;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.util.MiniZooKeeperCluster;
@@ -45,7 +46,11 @@ public class ZookeeperHelper {
    * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
    */
   public ZookeeperHelper() {
-    config = DrillConfig.create();
+    final Properties overrideProps = new Properties();
+    // Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
+    // Setting this causes unittests to fail.
+    // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+    config = DrillConfig.create(overrideProps);
     zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
 
     if (!testDir.exists()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 3749716..72e6c44 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -148,6 +148,10 @@ public class TestBitRpc extends ExecTest {
       }
     }
 
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // TODO(We don't have any interrupts in test code)
+    }
   }
 
   private class BitComTestHandler implements DataResponseHandler {

http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index d72d498..3efb629 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
@@ -780,4 +781,11 @@ public class TestDrillbitResilience {
           Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
     }
   }
+
+  @Test
+  public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+    final String control =
+        createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+    assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+  }
 }


[3/6] drill git commit: DRILL-2780: Check for open files when closing OperatorContext

Posted by sm...@apache.org.
DRILL-2780: Check for open files when closing OperatorContext


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ed200e25
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ed200e25
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ed200e25

Branch: refs/heads/master
Commit: ed200e257249e67176a259bab0d607841bf37fa4
Parents: 6d7cda8
Author: Steven Phillips <sm...@apache.org>
Authored: Sun May 10 02:33:14 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/OperatorContext.java  |   5 +
 .../drill/exec/ops/OperatorContextImpl.java     |  22 ++++
 .../drill/exec/store/dfs/DrillFileSystem.java   |  42 +++----
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   2 +-
 .../exec/work/fragment/FragmentExecutor.java    |   2 +-
 .../drill/exec/testing/TestResourceLeak.java    | 126 +++++++++++++++++++
 .../resources/memory/tpch01_memory_leak.sql     |  22 ++++
 8 files changed, 195 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 35139d5..7eb7d8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
 
 public abstract class OperatorContext {
 
@@ -39,6 +42,8 @@ public abstract class OperatorContext {
 
   public abstract ExecutionControls getExecutionControls();
 
+  public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
+
   public static int getChildCount(PhysicalOperator popConfig) {
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 9fa8867..ce9f351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,14 +17,20 @@
  */
 package org.apache.drill.exec.ops;
 
+import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 import com.carrotsearch.hppc.LongObjectOpenHashMap;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
 
 class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
@@ -36,6 +42,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private OperatorStats stats;
   private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
   private final boolean applyFragmentLimit;
+  private DrillFileSystem fs;
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
     this.applyFragmentLimit=applyFragmentLimit;
@@ -108,6 +115,14 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     if (allocator != null) {
       allocator.close();
     }
+
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
     closed = true;
   }
 
@@ -115,4 +130,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return stats;
   }
 
+  @Override
+  public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+    fs = new DrillFileSystem(conf, getStats());
+    return fs;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index b6a9c30..25dd811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -71,30 +71,7 @@ import com.google.common.collect.Maps;
 public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
   private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
-  private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
-
-  static {
-    if (TRACKING_ENABLED) {
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        public void run() {
-          if (openedFiles.size() != 0) {
-            final StringBuffer errMsgBuilder = new StringBuffer();
-
-            errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
-                " still [%d] files open.\n", openedFiles.size()));
-
-            for(DebugStackTrace stackTrace : openedFiles.values()) {
-              stackTrace.addToStringBuilder(errMsgBuilder);
-            }
-
-            final String errMsg = errMsgBuilder.toString();
-            logger.error(errMsg);
-            throw new IllegalStateException(errMsg);
-          }
-        }
-      });
-    }
-  }
+  private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
 
   private final FileSystem underlyingFs;
   private final OperatorStats operatorStats;
@@ -420,7 +397,22 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   @Override
   public void close() throws IOException {
-    underlyingFs.close();
+    if (TRACKING_ENABLED) {
+      if (openedFiles.size() != 0) {
+        final StringBuffer errMsgBuilder = new StringBuffer();
+
+        errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+            " still [%d] files open.\n", openedFiles.size()));
+
+        for (DebugStackTrace stackTrace : openedFiles.values()) {
+          stackTrace.addToStringBuilder(errMsgBuilder);
+        }
+
+        final String errMsg = errMsgBuilder.toString();
+        logger.error(errMsg);
+        throw new IllegalStateException(errMsg);
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d1b46e1..233c32b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
                                                                        */);
     final DrillFileSystem dfs;
     try {
-      dfs = new DrillFileSystem(fsConf, oContext.getStats());
+      dfs = oContext.newFileSystem(fsConf);
     } catch (IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5e9c4ca..4423fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -100,7 +100,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
 
     DrillFileSystem fs;
     try {
-      fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+      fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
     } catch(IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index d96e6d6..2712fe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -228,7 +228,6 @@ public class FragmentExecutor implements Runnable {
         }
       });
 
-      updateState(FragmentState.FINISHED);
     } catch (OutOfMemoryError | OutOfMemoryRuntimeException e) {
       if (!(e instanceof OutOfMemoryError) || "Direct buffer memory".equals(e.getMessage())) {
         fail(UserException.memoryError(e).build());
@@ -249,6 +248,7 @@ public class FragmentExecutor implements Runnable {
 
       closeOutResources();
 
+      updateState(FragmentState.FINISHED);
       // send the final state of the fragment. only the main execution thread can send the final state and it can
       // only be sent once.
       sendFinalState();

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
new file mode 100644
index 0000000..d7e317c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestResourceLeak extends DrillTest {
+
+  private static DrillClient client;
+  private static Drillbit bit;
+  private static RemoteServiceSet serviceSet;
+  private static DrillConfig config;
+  private static BufferAllocator allocator;
+
+  @SuppressWarnings("serial")
+  private static final Properties TEST_CONFIGURATIONS = new Properties() {
+    {
+      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
+      put(ExecConstants.HTTP_ENABLE, "false");
+    }
+  };
+
+  @BeforeClass
+  public static void openClient() throws Exception {
+    config = DrillConfig.create(TEST_CONFIGURATIONS);
+    allocator = new TopLevelAllocator(config);
+    serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    bit = new Drillbit(config, serviceSet);
+    bit.run();
+    client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+  }
+
+  @Test()
+  public void tpch01() throws Exception {
+    final String query = getFile("memory/tpch01_memory_leak.sql");
+    try {
+      QueryTestUtil.test(client, "alter session set `planner.slice_target` = 10; " + query);
+    } catch (UserRemoteException e) {
+      if (e.getMessage().contains("Attempted to close accountor")) {
+        return;
+      }
+      throw e;
+    }
+    Assert.fail("Expected UserRemoteException indicating memory leak");
+  }
+
+  private static String getFile(String resource) throws IOException {
+    URL url = Resources.getResource(resource);
+    if (url == null) {
+      throw new IOException(String.format("Unable to find path %s.", resource));
+    }
+    return Resources.toString(url, Charsets.UTF_8);
+  }
+
+  @AfterClass
+  public static void closeClient() throws Exception {
+    try {
+      allocator.close();
+      serviceSet.close();
+      bit.close();
+      client.close();
+    } catch (IllegalStateException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @FunctionTemplate(name = "leakResource", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+  public static class Leak implements DrillSimpleFunc {
+
+    @Param Float8Holder in;
+    @Inject DrillBuf buf;
+    @Output Float8Holder out;
+
+    public void setup() {}
+
+    public void eval() {
+      buf.getAllocator().buffer(1);
+      out.value = in.value;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
new file mode 100644
index 0000000..12d30ab
--- /dev/null
+++ b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
@@ -0,0 +1,22 @@
+select
+  l_returnflag,
+  l_linestatus,
+  sum(leakResource(l_quantity)) as sum_qty,
+  sum(l_extendedprice) as sum_base_price,
+  sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+  sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+  avg(l_quantity) as avg_qty,
+  avg(l_extendedprice) as avg_price,
+  avg(l_discount) as avg_disc,
+  count(*) as count_order
+from
+  cp.`tpch/lineitem.parquet`
+where
+  l_shipdate <= date '1998-12-01' - interval '120' day (3)
+group by
+  l_returnflag,
+  l_linestatus
+
+order by
+  l_returnflag,
+  l_linestatus;
\ No newline at end of file