You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/13 04:42:50 UTC
[1/6] drill git commit: DRILL-3051: Fix integer overflow in
TimedRunnable
Repository: drill
Updated Branches:
refs/heads/master 6d7cda8ea -> 83d8ebe60
DRILL-3051: Fix integer overflow in TimedRunnable
also fixes a similar overflow in Foreman
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83d8ebe6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83d8ebe6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83d8ebe6
Branch: refs/heads/master
Commit: 83d8ebe60d9e5e8c5b842dff223f87739aea0a6c
Parents: b3d097b
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 18:19:49 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/drill/exec/store/TimedRunnable.java | 2 +-
.../org/apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../org/apache/drill/exec/store/TestTimedRunnable.java | 12 ++++++++++++
3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index e52820b..240aaef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Lists;
*/
public abstract class TimedRunnable<V> implements Runnable {
- private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+ private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
private volatile Exception e;
private volatile long timeNanos;
http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index af3309f..cdf1276 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
private static final ObjectMapper MAPPER = new ObjectMapper();
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
- private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+ private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
private final QueryId queryId;
private final RunQuery queryRequest;
http://git-wip-us.apache.org/repos/asf/drill/blob/83d8ebe6/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
index 2807c35..44bb8b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -100,4 +100,16 @@ public class TestTimedRunnable extends DrillTest {
containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
"complete. Total runnable size 100, parallelism 16."));
}
+
+ @Test
+ public void withManyTasks() throws Exception {
+
+ List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+ for (int i = 0; i < 150000; i++) {
+ tasks.add(new TestTask(0));
+ }
+
+ TimedRunnable.run("Execution with lots of tasks", logger, tasks, 16);
+ }
}
[6/6] drill git commit: DRILL-3048: Disable assertions by default in
distribution
Posted by sm...@apache.org.
DRILL-3048: Disable assertions by default in distribution
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/20b36885
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/20b36885
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/20b36885
Branch: refs/heads/master
Commit: 20b36885c12b93af1fcfeda81ac35eada8436903
Parents: 3616310
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 17:49:12 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
distribution/src/resources/drill-env.sh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/20b36885/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 345938e..53a3cb7 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -16,7 +16,7 @@
DRILL_MAX_DIRECT_MEMORY="8G"
DRILL_HEAP="4G"
-export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -Ddrill.exec.enable-epoll=true -ea"
+export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -Ddrill.exec.enable-epoll=true"
# Class unloading is disabled by default in Java 7
# http://hg.openjdk.java.net/jdk7u/jdk7u60/hotspot/file/tip/src/share/vm/runtime/globals.hpp#l1622
[2/6] drill git commit: DRILL-3049: Increase sort spooling threshold
Posted by sm...@apache.org.
DRILL-3049: Increase sort spooling threshold
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/01a36f1e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/01a36f1e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/01a36f1e
Branch: refs/heads/master
Commit: 01a36f1e81fee08a5f7af204ac8cbdbf8d74a0b2
Parents: 20b3688
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 17:55:54 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
exec/java-exec/src/main/resources/drill-module.conf | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/01a36f1e/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b1e3fde..7630938 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -136,13 +136,13 @@ drill.exec: {
cache_max_size: 1000
},
sort: {
- purge.threshold : 100,
+ purge.threshold : 1000,
external: {
batch.size : 4000,
spill: {
batch.size : 4000,
- group.size : 100,
- threshold : 200,
+ group.size : 40000,
+ threshold : 40000,
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
[5/6] drill git commit: DRILL-3050: Increase QueryContext max
allocation to 256MB
Posted by sm...@apache.org.
DRILL-3050: Increase QueryContext max allocation to 256MB
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b3d097b0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b3d097b0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b3d097b0
Branch: refs/heads/master
Commit: b3d097b03e37a57494262051a76a824a957e8605
Parents: 01a36f1
Author: Steven Phillips <sm...@apache.org>
Authored: Tue May 12 18:05:34 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/ops/QueryContext.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/b3d097b0/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 8917a24..06f8088 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -54,7 +54,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
- private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
+ private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 256 * 1024 * 1024;
private final DrillbitContext drillbitContext;
private final UserSession session;
[4/6] drill git commit: DRILL-2755: (part2) Use and handle
InterruptedException during query processing
Posted by sm...@apache.org.
DRILL-2755: (part2) Use and handle InterruptedException during query processing
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36163105
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36163105
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36163105
Branch: refs/heads/master
Commit: 361631056ad00ab5f7df1eeed50d95f654f642a3
Parents: ed200e2
Author: vkorukanti <ve...@gmail.com>
Authored: Tue May 12 10:23:38 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 9 ++++
.../drill/exec/ops/AccountingDataTunnel.java | 11 ++++
.../org/apache/drill/exec/ops/Consumer.java | 5 +-
.../apache/drill/exec/ops/FragmentContext.java | 8 +++
.../apache/drill/exec/ops/StatusHandler.java | 6 +++
.../exec/physical/impl/SingleSenderCreator.java | 8 ++-
.../impl/mergereceiver/MergingRecordBatch.java | 10 ++++
.../partitionsender/PartitionerDecorator.java | 1 +
.../UnorderedReceiverBatch.java | 10 ++++
.../drill/exec/rpc/BaseRpcOutcomeListener.java | 7 ++-
.../org/apache/drill/exec/rpc/BasicClient.java | 53 ++++++++++++++++----
.../drill/exec/rpc/DrillRpcFutureImpl.java | 17 +++----
.../apache/drill/exec/rpc/FutureBitCommand.java | 6 +++
.../apache/drill/exec/rpc/ListeningCommand.java | 4 ++
.../drill/exec/rpc/ReconnectingConnection.java | 51 ++++++++++++-------
.../apache/drill/exec/rpc/RemoteConnection.java | 3 +-
.../drill/exec/rpc/RpcOutcomeListener.java | 14 +++++-
.../apache/drill/exec/rpc/data/DataTunnel.java | 42 +++++++++++++++-
.../drill/exec/rpc/user/QueryResultHandler.java | 14 +++++-
.../exec/testing/NoOpControlsInjector.java | 5 ++
.../apache/drill/exec/work/foreman/Foreman.java | 40 +++++++++++++--
.../drill/exec/work/foreman/QueryManager.java | 6 +++
.../exec/work/fragment/FragmentExecutor.java | 3 ++
.../src/main/resources/drill-module.conf | 5 +-
.../org/apache/drill/exec/ZookeeperHelper.java | 7 ++-
.../apache/drill/exec/server/TestBitRpc.java | 4 ++
.../exec/server/TestDrillbitResilience.java | 8 +++
27 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1a10aa2..7bbb815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -86,6 +86,15 @@ public interface ExecConstants {
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+
+ /**
+ * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
+ * FINISHED we report the query result as CANCELLED by swallowing the failures occurred in fragments. This BOOT
+ * setting allows the user to see the query status as failure. Useful for developers/testers.
+ */
+ public static final String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
+ "drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
+
/** Fragment memory planning */
public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 2bcfdbc..2659464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -21,6 +21,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.slf4j.Logger;
/**
* Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to
@@ -41,4 +44,12 @@ public class AccountingDataTunnel {
sendingAccountor.increment();
tunnel.sendRecordBatch(statusHandler, batch);
}
+
+ /**
+ * See {@link DataTunnel#setTestInjectionControls(ExecutionControlsInjector, ExecutionControls, Logger)}.
+ */
+ public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+ final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+ tunnel.setTestInjectionControls(testInjector, testControls, testLogger);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
index 9b8ba38..0a1a397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
@@ -19,5 +19,8 @@ package org.apache.drill.exec.ops;
//TODO replace this when we switch to JDK8, which includes this
public interface Consumer<T> {
- public void accept(T t);
+
+ void accept(T t);
+
+ void interrupt(final InterruptedException e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index cf4e9bb..1cbe886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -90,6 +90,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
public void accept(final RpcException e) {
fail(e);
}
+
+ @Override
+ public void interrupt(final InterruptedException e) {
+ if (shouldContinue()) {
+ logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
+ fail(e);
+ }
+ }
};
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
index 79fc0b0..66e35b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
@@ -53,4 +53,10 @@ public class StatusHandler implements RpcOutcomeListener<Ack> {
// if we didn't get ack ok, we'll need to kill the query.
consumer.accept(new RpcException("Data not accepted downstream."));
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ sendingAccountor.decrement();
+ consumer.interrupt(e);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index fe6239e..1f6767c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -41,8 +42,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
return new SingleSenderRootExec(context, children.iterator().next(), config);
}
- private static class SingleSenderRootExec extends BaseRootExec {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+ public static class SingleSenderRootExec extends BaseRootExec {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(SingleSenderRootExec.class);
private final FragmentHandle oppositeHandle;
@@ -76,6 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
.setMinorFragmentId(config.getOppositeMinorFragmentId())
.build();
tunnel = context.getDataTunnel(config.getDestination());
+ tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 611052b..6da132b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -534,6 +534,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
+ // TODO: Code duplication. UnorderedReceiverBatch has the same implementation.
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
@@ -545,6 +546,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ if (context.shouldContinue()) {
+ final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+ logger.error(errMsg, e);
+ context.fail(new RpcException(errMsg, e));
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c355070..e210514 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -178,6 +178,7 @@ public class PartitionerDecorator {
} catch (final InterruptedException e) {
// If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
if (!context.shouldContinue()) {
+ logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
for(Future f : taskFutures) {
f.cancel(true);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index e40fe54..1498441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -240,6 +240,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
}
}
+ // TODO: Code duplication. MergingRecordBatch has the same implementation.
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
@@ -251,6 +252,15 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ if (context.shouldContinue()) {
+ final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+ logger.error(errMsg, e);
+ context.fail(new RpcException(errMsg, e));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 9b071ad..dfc62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -32,5 +32,10 @@ public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
public void success(T value, ByteBuf buffer) {
}
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index d551173..34758ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -194,19 +194,45 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void operationComplete(ChannelFuture future) throws Exception {
+ boolean isInterrupted = false;
+
+ final long timeoutMills = 30000;
+
+ // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure immediately.
+ final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
- try {
- future.get();
- if (future.isSuccess()) {
- // send a handshake on the current thread. This is the only time we will send from within the event thread.
- // We can do this because the connection will not be backed up.
- send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
- } else {
- l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ while(true) {
+ try {
+ future.get(timeoutMills, TimeUnit.MILLISECONDS);
+ if (future.isSuccess()) {
+ // send a handshake on the current thread. This is the only time we will send from within the event thread.
+ // We can do this because the connection will not be backed up.
+ send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
+ } else {
+ l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ }
+ // logger.debug("Handshake queued for send.");
+ break;
+ } catch (final InterruptedException interruptEx) {
+ // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+ isInterrupted = true;
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ l.connectionFailed(FailureType.CONNECTION, interruptEx);
+ break;
+ }
+ } catch (final Exception ex) {
+ l.connectionFailed(FailureType.CONNECTION, ex);
+ break;
}
- // logger.debug("Handshake queued for send.");
- } catch (Exception ex) {
- l.connectionFailed(FailureType.CONNECTION, ex);
+ }
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
}
@@ -235,6 +261,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
}
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for handshake response", ex);
+ l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 19d9c30..cbe63c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -31,17 +31,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
super(new InnerFuture<V>());
}
- /**
- * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
- * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
- * will result in an UnsupportedOperationException.
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- throw new UnsupportedOperationException(
- "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
- }
-
@Override
protected RpcException mapException(Exception ex) {
return RpcException.mapException(ex);
@@ -72,6 +61,12 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
}
@Override
+ public void interrupted(final InterruptedException ex) {
+ // Propagate the interrupt to inner future
+ ( (InnerFuture<V>)delegate()).cancel(true);
+ }
+
+ @Override
public ByteBuf getBuffer() {
return buffer;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 6c7bf3e..53d0772 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -58,6 +58,12 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteCo
settableFuture.set(value);
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // If we are interrupted while performing the command, consider as failure.
+ logger.warn("Interrupted while running the command", e);
+ failed(new RpcException(e));
+ }
}
public DrillRpcFuture<T> getFuture() {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index e32ca8a..92022b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -55,6 +55,10 @@ public abstract class ListeningCommand<T extends MessageLite, C extends RemoteCo
listener.success(value, buf);
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ listener.interrupted(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index f0787a5..12e0063 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -22,6 +22,8 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
@@ -100,28 +102,44 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
* Called by
*/
public void waitAndRun() {
- try {
-// logger.debug("Waiting for connection.");
- CONNECTION_TYPE connection = this.get();
-
- if (connection == null) {
-// logger.debug("Connection failed.");
- return;
- } else {
-// logger.debug("Connection received. {}", connection);
- cmd.connectionSucceeded(connection);
-// logger.debug("Finished connection succeeded activity.");
+ boolean isInterrupted = false;
+
+ final long timeoutMills = 30000;
+ // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure immediately.
+ final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
+ while(true) {
+ try {
+ // logger.debug("Waiting for connection.");
+ CONNECTION_TYPE connection = this.get(timeoutMills, TimeUnit.MILLISECONDS);
+
+ if (connection == null) {
+ // logger.debug("Connection failed.");
+ } else {
+ // logger.debug("Connection received. {}", connection);
+ cmd.connectionSucceeded(connection);
+ // logger.debug("Finished connection succeeded activity.");
+ }
+ break;
+ } catch (final InterruptedException interruptEx) {
+ // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+ isInterrupted = true;
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
+ break;
+ }
+ } catch (final ExecutionException | TimeoutException ex) {
+ cmd.connectionFailed(FailureType.CONNECTION, ex);
}
- } catch (final InterruptedException e) {
- cmd.connectionFailed(FailureType.CONNECTION, e);
+ }
+ if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- throw new IllegalStateException();
}
-
}
@Override
@@ -243,7 +261,6 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
delegate.connectionFailed(type, t);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 2ee9263..199569c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -71,7 +71,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
writeManager.waitForWritable();
return true;
}catch(final InterruptedException e){
- listener.failed(new RpcException(e));
+ listener.interrupted(e);
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
@@ -135,7 +135,6 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
if (channel.isActive()) {
channel.close().get();
}
- channel.close().get();
} catch (final InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 7d7c860..4485cf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -21,9 +21,19 @@ import io.netty.buffer.ByteBuf;
public interface RpcOutcomeListener<V> {
- public void failed(RpcException ex);
- public void success(V value, ByteBuf buffer);
+ /**
+ * Called when an error occurred while waiting for the RPC outcome.
+ * @param ex
+ */
+ void failed(RpcException ex);
+ void success(V value, ByteBuf buffer);
+
+ /**
+ * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
+ * failures.
+ */
+ void interrupted(final InterruptedException e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index ed31bed..143020f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -31,6 +31,8 @@ import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class DataTunnel {
@@ -39,17 +41,49 @@ public class DataTunnel {
private final DataConnectionManager manager;
private final Semaphore sendingSemaphore = new Semaphore(3);
+ // Needed for injecting a test pause
+ private boolean isInjectionControlSet;
+ private ExecutionControlsInjector testInjector;
+ private ExecutionControls testControls;
+ private org.slf4j.Logger testLogger;
+
+
public DataTunnel(DataConnectionManager manager) {
this.manager = manager;
}
+ /**
+ * Once a DataTunnel is created, clients of DataTunnel can pass injection controls to enable setting injections at
+ * pre-defined places. Currently following injection sites are available.
+ *
+ * 1. In method {@link #sendRecordBatch(RpcOutcomeListener, FragmentWritableBatch)}, an interruptible pause injection
+ * is available before acquiring the sending slot. Site name is: "data-tunnel-send-batch-wait-for-interrupt"
+ *
+ * @param testInjector
+ * @param testControls
+ * @param testLogger
+ */
+ public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+ final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+ isInjectionControlSet = true;
+ this.testInjector = testInjector;
+ this.testControls = testControls;
+ this.testLogger = testLogger;
+ }
+
public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
try{
+ if (isInjectionControlSet) {
+ // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for
+ // semaphore acquire. We expect the
+ testInjector.injectInterruptiblePause(testControls, "data-tunnel-send-batch-wait-for-interrupt", testLogger);
+ }
+
sendingSemaphore.acquire();
manager.runCommand(b);
}catch(final InterruptedException e){
- outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
+ outcomeListener.interrupted(e);
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
@@ -57,6 +91,7 @@ public class DataTunnel {
}
}
+ // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
try{
@@ -93,6 +128,11 @@ public class DataTunnel {
inner.success(value, buffer);
}
+ @Override
+ public void interrupted(InterruptedException e) {
+ sendingSemaphore.release();
+ inner.interrupted(e);
+ }
}
private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 143d104..8443948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -349,9 +349,21 @@ public class QueryResultHandler {
throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
}
}
-
}
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for query results from Drillbit", ex);
+
+ if (!isTerminal.compareAndSet(false, true)) {
+ return;
+ }
+
+ closeFuture.removeListener(closeListener);
+
+ // Throw an interrupted UserException?
+ resultsListener.submissionFailed(UserException.systemError(ex).build());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index bb13d1f..bf4221e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -42,6 +42,11 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
final Logger logger) {
}
+ @Override
+ public void injectInterruptiblePause(ExecutionControls executionControls, String desc, Logger logger)
+ throws InterruptedException {
+ }
+
/**
* When assertions are not enabled, this count down latch that does nothing is injected.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bf62ccb..af3309f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -610,6 +610,18 @@ public class Foreman implements Runnable {
}
/**
+ * Ignore the current status and force the given failure as current status.
+ * NOTE: Used only for testing purposes. Shouldn't be used in production.
+ */
+ public void setForceFailure(final Exception exception) {
+ Preconditions.checkArgument(exception != null);
+ Preconditions.checkState(!isClosed);
+
+ resultState = QueryState.FAILED;
+ resultException = exception;
+ }
+
+ /**
* Add an exception to the result. All exceptions after the first become suppressed
* exceptions hanging off the first.
*
@@ -837,6 +849,14 @@ public class Foreman implements Runnable {
if ((newState == QueryState.CANCELED)
|| (newState == QueryState.COMPLETED)
|| (newState == QueryState.FAILED)) {
+
+ if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+ if (newState == QueryState.FAILED) {
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ foremanResult.setForceFailure(exception);
+ }
+ }
/*
* These amount to a completion of the cancellation requests' cleanup;
* now we can clean up and send the result.
@@ -1109,6 +1129,15 @@ public class Foreman implements Runnable {
stateListener.moveToState(QueryState.FAILED, ex);
}
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
+ // Consider the interrupt as failure.
+ final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+ logger.error(errMsg, e);
+ failed(new RpcException(errMsg, e));
+ }
}
/**
@@ -1139,10 +1168,15 @@ public class Foreman implements Runnable {
private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
public void failed(final RpcException ex) {
- logger.info(
- "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
- ex);
+ logger.info("Failure while trying communicate query result to initiating client. " +
+ "This would happen if a client is disconnected before response notice can be sent.", ex);
stateListener.moveToState(QueryState.FAILED, ex);
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
+ stateListener.moveToState(QueryState.FAILED, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 84a38a6..eed4e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -267,6 +267,12 @@ public class QueryManager {
ack);
}
}
+
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
+ "Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
+ }
}
QueryState updateEphemeralState(final QueryState queryState) {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2712fe7..6b44ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -145,6 +145,7 @@ public class FragmentExecutor implements Runnable {
*/
final Thread myThread = myThreadRef.get();
if (myThread != null) {
+ logger.debug("Interrupting fragment thread {}", myThread.getName());
myThread.interrupt();
}
}
@@ -343,6 +344,8 @@ public class FragmentExecutor implements Runnable {
case FINISHED:
if(current == FragmentState.CANCELLATION_REQUESTED){
target = FragmentState.CANCELLED;
+ } else if (current == FragmentState.FAILED) {
+ target = FragmentState.FAILED;
}
// fall-through
case FAILED:
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index d98b97a..b1e3fde 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -159,5 +159,8 @@ drill.exec: {
initial: 20000000
}
},
- debug.error_on_leak: true
+ debug: {
+ error_on_leak: true,
+ return_error_for_failure_in_cancelled_fragments: false
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index 7fcf4cb..a5db81d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Throwables.propagate;
import java.io.File;
import java.io.IOException;
+import java.util.Properties;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.util.MiniZooKeeperCluster;
@@ -45,7 +46,11 @@ public class ZookeeperHelper {
* <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
*/
public ZookeeperHelper() {
- config = DrillConfig.create();
+ final Properties overrideProps = new Properties();
+ // Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
+ // Setting this causes unittests to fail.
+ // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ config = DrillConfig.create(overrideProps);
zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
if (!testDir.exists()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 3749716..72e6c44 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -148,6 +148,10 @@ public class TestBitRpc extends ExecTest {
}
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // TODO(We don't have any interrupts in test code)
+ }
}
private class BitComTestHandler implements DataResponseHandler {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index d72d498..3efb629 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
@@ -780,4 +781,11 @@ public class TestDrillbitResilience {
Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
}
}
+
+ @Test
+ public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+ final String control =
+ createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+ assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ }
}
[3/6] drill git commit: DRILL-2780: Check for open files when closing
OperatorContext
Posted by sm...@apache.org.
DRILL-2780: Check for open files when closing OperatorContext
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ed200e25
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ed200e25
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ed200e25
Branch: refs/heads/master
Commit: ed200e257249e67176a259bab0d607841bf37fa4
Parents: 6d7cda8
Author: Steven Phillips <sm...@apache.org>
Authored: Sun May 10 02:33:14 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/ops/OperatorContext.java | 5 +
.../drill/exec/ops/OperatorContextImpl.java | 22 ++++
.../drill/exec/store/dfs/DrillFileSystem.java | 42 +++----
.../exec/store/dfs/easy/EasyFormatPlugin.java | 2 +-
.../store/parquet/ParquetScanBatchCreator.java | 2 +-
.../exec/work/fragment/FragmentExecutor.java | 2 +-
.../drill/exec/testing/TestResourceLeak.java | 126 +++++++++++++++++++
.../resources/memory/tpch01_memory_leak.sql | 22 ++++
8 files changed, 195 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 35139d5..7eb7d8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
public abstract class OperatorContext {
@@ -39,6 +42,8 @@ public abstract class OperatorContext {
public abstract ExecutionControls getExecutionControls();
+ public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
+
public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator();
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 9fa8867..ce9f351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,14 +17,20 @@
*/
package org.apache.drill.exec.ops;
+import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
class OperatorContextImpl extends OperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
@@ -36,6 +42,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private OperatorStats stats;
private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
private final boolean applyFragmentLimit;
+ private DrillFileSystem fs;
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
this.applyFragmentLimit=applyFragmentLimit;
@@ -108,6 +115,14 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
if (allocator != null) {
allocator.close();
}
+
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
closed = true;
}
@@ -115,4 +130,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return stats;
}
+ @Override
+ public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+ Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+ fs = new DrillFileSystem(conf, getStats());
+ return fs;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index b6a9c30..25dd811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -71,30 +71,7 @@ import com.google.common.collect.Maps;
public class DrillFileSystem extends FileSystem implements OpenFileTracker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
- private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
-
- static {
- if (TRACKING_ENABLED) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- if (openedFiles.size() != 0) {
- final StringBuffer errMsgBuilder = new StringBuffer();
-
- errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
- " still [%d] files open.\n", openedFiles.size()));
-
- for(DebugStackTrace stackTrace : openedFiles.values()) {
- stackTrace.addToStringBuilder(errMsgBuilder);
- }
-
- final String errMsg = errMsgBuilder.toString();
- logger.error(errMsg);
- throw new IllegalStateException(errMsg);
- }
- }
- });
- }
- }
+ private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
private final FileSystem underlyingFs;
private final OperatorStats operatorStats;
@@ -420,7 +397,22 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
@Override
public void close() throws IOException {
- underlyingFs.close();
+ if (TRACKING_ENABLED) {
+ if (openedFiles.size() != 0) {
+ final StringBuffer errMsgBuilder = new StringBuffer();
+
+ errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+ " still [%d] files open.\n", openedFiles.size()));
+
+ for (DebugStackTrace stackTrace : openedFiles.values()) {
+ stackTrace.addToStringBuilder(errMsgBuilder);
+ }
+
+ final String errMsg = errMsgBuilder.toString();
+ logger.error(errMsg);
+ throw new IllegalStateException(errMsg);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d1b46e1..233c32b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*/);
final DrillFileSystem dfs;
try {
- dfs = new DrillFileSystem(fsConf, oContext.getStats());
+ dfs = oContext.newFileSystem(fsConf);
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5e9c4ca..4423fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -100,7 +100,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
DrillFileSystem fs;
try {
- fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+ fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
} catch(IOException e) {
throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index d96e6d6..2712fe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -228,7 +228,6 @@ public class FragmentExecutor implements Runnable {
}
});
- updateState(FragmentState.FINISHED);
} catch (OutOfMemoryError | OutOfMemoryRuntimeException e) {
if (!(e instanceof OutOfMemoryError) || "Direct buffer memory".equals(e.getMessage())) {
fail(UserException.memoryError(e).build());
@@ -249,6 +248,7 @@ public class FragmentExecutor implements Runnable {
closeOutResources();
+ updateState(FragmentState.FINISHED);
// send the final state of the fragment. only the main execution thread can send the final state and it can
// only be sent once.
sendFinalState();
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
new file mode 100644
index 0000000..d7e317c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestResourceLeak extends DrillTest {
+
+ private static DrillClient client;
+ private static Drillbit bit;
+ private static RemoteServiceSet serviceSet;
+ private static DrillConfig config;
+ private static BufferAllocator allocator;
+
+ @SuppressWarnings("serial")
+ private static final Properties TEST_CONFIGURATIONS = new Properties() {
+ {
+ put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
+ put(ExecConstants.HTTP_ENABLE, "false");
+ }
+ };
+
+ @BeforeClass
+ public static void openClient() throws Exception {
+ config = DrillConfig.create(TEST_CONFIGURATIONS);
+ allocator = new TopLevelAllocator(config);
+ serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ bit = new Drillbit(config, serviceSet);
+ bit.run();
+ client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+ }
+
+ @Test()
+ public void tpch01() throws Exception {
+ final String query = getFile("memory/tpch01_memory_leak.sql");
+ try {
+ QueryTestUtil.test(client, "alter session set `planner.slice_target` = 10; " + query);
+ } catch (UserRemoteException e) {
+ if (e.getMessage().contains("Attempted to close accountor")) {
+ return;
+ }
+ throw e;
+ }
+ Assert.fail("Expected UserRemoteException indicating memory leak");
+ }
+
+ private static String getFile(String resource) throws IOException {
+ URL url = Resources.getResource(resource);
+ if (url == null) {
+ throw new IOException(String.format("Unable to find path %s.", resource));
+ }
+ return Resources.toString(url, Charsets.UTF_8);
+ }
+
+ @AfterClass
+ public static void closeClient() throws Exception {
+ try {
+ allocator.close();
+ serviceSet.close();
+ bit.close();
+ client.close();
+ } catch (IllegalStateException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @FunctionTemplate(name = "leakResource", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+ public static class Leak implements DrillSimpleFunc {
+
+ @Param Float8Holder in;
+ @Inject DrillBuf buf;
+ @Output Float8Holder out;
+
+ public void setup() {}
+
+ public void eval() {
+ buf.getAllocator().buffer(1);
+ out.value = in.value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
new file mode 100644
index 0000000..12d30ab
--- /dev/null
+++ b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
@@ -0,0 +1,22 @@
+select
+ l_returnflag,
+ l_linestatus,
+ sum(leakResource(l_quantity)) as sum_qty,
+ sum(l_extendedprice) as sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+ avg(l_quantity) as avg_qty,
+ avg(l_extendedprice) as avg_price,
+ avg(l_discount) as avg_disc,
+ count(*) as count_order
+from
+ cp.`tpch/lineitem.parquet`
+where
+ l_shipdate <= date '1998-12-01' - interval '120' day (3)
+group by
+ l_returnflag,
+ l_linestatus
+
+order by
+ l_returnflag,
+ l_linestatus;
\ No newline at end of file