You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/13 04:42:53 UTC
[4/6] drill git commit: DRILL-2755: (part2) Use and handle
InterruptedException during query processing
DRILL-2755: (part2) Use and handle InterruptedException during query processing
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/36163105
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/36163105
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/36163105
Branch: refs/heads/master
Commit: 361631056ad00ab5f7df1eeed50d95f654f642a3
Parents: ed200e2
Author: vkorukanti <ve...@gmail.com>
Authored: Tue May 12 10:23:38 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 9 ++++
.../drill/exec/ops/AccountingDataTunnel.java | 11 ++++
.../org/apache/drill/exec/ops/Consumer.java | 5 +-
.../apache/drill/exec/ops/FragmentContext.java | 8 +++
.../apache/drill/exec/ops/StatusHandler.java | 6 +++
.../exec/physical/impl/SingleSenderCreator.java | 8 ++-
.../impl/mergereceiver/MergingRecordBatch.java | 10 ++++
.../partitionsender/PartitionerDecorator.java | 1 +
.../UnorderedReceiverBatch.java | 10 ++++
.../drill/exec/rpc/BaseRpcOutcomeListener.java | 7 ++-
.../org/apache/drill/exec/rpc/BasicClient.java | 53 ++++++++++++++++----
.../drill/exec/rpc/DrillRpcFutureImpl.java | 17 +++----
.../apache/drill/exec/rpc/FutureBitCommand.java | 6 +++
.../apache/drill/exec/rpc/ListeningCommand.java | 4 ++
.../drill/exec/rpc/ReconnectingConnection.java | 51 ++++++++++++-------
.../apache/drill/exec/rpc/RemoteConnection.java | 3 +-
.../drill/exec/rpc/RpcOutcomeListener.java | 14 +++++-
.../apache/drill/exec/rpc/data/DataTunnel.java | 42 +++++++++++++++-
.../drill/exec/rpc/user/QueryResultHandler.java | 14 +++++-
.../exec/testing/NoOpControlsInjector.java | 5 ++
.../apache/drill/exec/work/foreman/Foreman.java | 40 +++++++++++++--
.../drill/exec/work/foreman/QueryManager.java | 6 +++
.../exec/work/fragment/FragmentExecutor.java | 3 ++
.../src/main/resources/drill-module.conf | 5 +-
.../org/apache/drill/exec/ZookeeperHelper.java | 7 ++-
.../apache/drill/exec/server/TestBitRpc.java | 4 ++
.../exec/server/TestDrillbitResilience.java | 8 +++
27 files changed, 303 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1a10aa2..7bbb815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -86,6 +86,15 @@ public interface ExecConstants {
public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl";
public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
+
+ /**
+ * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
+ * FINISHED we report the query result as CANCELLED by swallowing the failures occurred in fragments. This BOOT
+ * setting allows the user to see the query status as failure. Useful for developers/testers.
+ */
+ public static final String RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS =
+ "drill.exec.debug.return_error_for_failure_in_cancelled_fragments";
+
/** Fragment memory planning */
public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index 2bcfdbc..2659464 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -21,6 +21,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
+import org.slf4j.Logger;
/**
* Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to
@@ -41,4 +44,12 @@ public class AccountingDataTunnel {
sendingAccountor.increment();
tunnel.sendRecordBatch(statusHandler, batch);
}
+
+ /**
+ * See {@link DataTunnel#setTestInjectionControls(ExecutionControlsInjector, ExecutionControls, Logger)}.
+ */
+ public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+ final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+ tunnel.setTestInjectionControls(testInjector, testControls, testLogger);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
index 9b8ba38..0a1a397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Consumer.java
@@ -19,5 +19,8 @@ package org.apache.drill.exec.ops;
//TODO replace this when we switch to JDK8, which includes this
public interface Consumer<T> {
- public void accept(T t);
+
+ void accept(T t);
+
+ void interrupt(final InterruptedException e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index cf4e9bb..1cbe886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -90,6 +90,14 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
public void accept(final RpcException e) {
fail(e);
}
+
+ @Override
+ public void interrupt(final InterruptedException e) {
+ if (shouldContinue()) {
+ logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
+ fail(e);
+ }
+ }
};
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
index 79fc0b0..66e35b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StatusHandler.java
@@ -53,4 +53,10 @@ public class StatusHandler implements RpcOutcomeListener<Ack> {
// if we didn't get ack ok, we'll need to kill the query.
consumer.accept(new RpcException("Data not accepted downstream."));
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ sendingAccountor.decrement();
+ consumer.interrupt(e);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index fe6239e..1f6767c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -41,8 +42,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
return new SingleSenderRootExec(context, children.iterator().next(), config);
}
- private static class SingleSenderRootExec extends BaseRootExec {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+ public static class SingleSenderRootExec extends BaseRootExec {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(SingleSenderRootExec.class);
private final FragmentHandle oppositeHandle;
@@ -76,6 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
.setMinorFragmentId(config.getOppositeMinorFragmentId())
.build();
tunnel = context.getDataTunnel(config.getDestination());
+ tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 611052b..6da132b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -534,6 +534,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
+ // TODO: Code duplication. UnorderedReceiverBatch has the same implementation.
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
@@ -545,6 +546,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ if (context.shouldContinue()) {
+ final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+ logger.error(errMsg, e);
+ context.fail(new RpcException(errMsg, e));
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c355070..e210514 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -178,6 +178,7 @@ public class PartitionerDecorator {
} catch (final InterruptedException e) {
// If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
if (!context.shouldContinue()) {
+ logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
for(Future f : taskFutures) {
f.cancel(true);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index e40fe54..1498441 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -240,6 +240,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
}
}
+ // TODO: Code duplication. MergingRecordBatch has the same implementation.
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
@@ -251,6 +252,15 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
public void success(final Ack value, final ByteBuf buffer) {
// Do nothing
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ if (context.shouldContinue()) {
+ final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+ logger.error(errMsg, e);
+ context.fail(new RpcException(errMsg, e));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 9b071ad..dfc62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -32,5 +32,10 @@ public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
public void success(T value, ByteBuf buffer) {
}
-
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index d551173..34758ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -194,19 +194,45 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
@Override
public void operationComplete(ChannelFuture future) throws Exception {
+ boolean isInterrupted = false;
+
+ final long timeoutMills = 30000;
+
+ // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure immediately.
+ final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
- try {
- future.get();
- if (future.isSuccess()) {
- // send a handshake on the current thread. This is the only time we will send from within the event thread.
- // We can do this because the connection will not be backed up.
- send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
- } else {
- l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ while(true) {
+ try {
+ future.get(timeoutMills, TimeUnit.MILLISECONDS);
+ if (future.isSuccess()) {
+ // send a handshake on the current thread. This is the only time we will send from within the event thread.
+ // We can do this because the connection will not be backed up.
+ send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
+ } else {
+ l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+ }
+ // logger.debug("Handshake queued for send.");
+ break;
+ } catch (final InterruptedException interruptEx) {
+ // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+ isInterrupted = true;
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ l.connectionFailed(FailureType.CONNECTION, interruptEx);
+ break;
+ }
+ } catch (final Exception ex) {
+ l.connectionFailed(FailureType.CONNECTION, ex);
+ break;
}
- // logger.debug("Handshake queued for send.");
- } catch (Exception ex) {
- l.connectionFailed(FailureType.CONNECTION, ex);
+ }
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
}
@@ -235,6 +261,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
}
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for handshake response", ex);
+ l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 19d9c30..cbe63c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -31,17 +31,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
super(new InnerFuture<V>());
}
- /**
- * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
- * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
- * will result in an UnsupportedOperationException.
- */
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- throw new UnsupportedOperationException(
- "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
- }
-
@Override
protected RpcException mapException(Exception ex) {
return RpcException.mapException(ex);
@@ -72,6 +61,12 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
}
@Override
+ public void interrupted(final InterruptedException ex) {
+ // Propagate the interrupt to inner future
+ ( (InnerFuture<V>)delegate()).cancel(true);
+ }
+
+ @Override
public ByteBuf getBuffer() {
return buffer;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
index 6c7bf3e..53d0772 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/FutureBitCommand.java
@@ -58,6 +58,12 @@ public abstract class FutureBitCommand<T extends MessageLite, C extends RemoteCo
settableFuture.set(value);
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // If we are interrupted while performing the command, consider as failure.
+ logger.warn("Interrupted while running the command", e);
+ failed(new RpcException(e));
+ }
}
public DrillRpcFuture<T> getFuture() {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
index e32ca8a..92022b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ListeningCommand.java
@@ -55,6 +55,10 @@ public abstract class ListeningCommand<T extends MessageLite, C extends RemoteCo
listener.success(value, buf);
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ listener.interrupted(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index f0787a5..12e0063 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -22,6 +22,8 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
@@ -100,28 +102,44 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
* Called by
*/
public void waitAndRun() {
- try {
-// logger.debug("Waiting for connection.");
- CONNECTION_TYPE connection = this.get();
-
- if (connection == null) {
-// logger.debug("Connection failed.");
- return;
- } else {
-// logger.debug("Connection received. {}", connection);
- cmd.connectionSucceeded(connection);
-// logger.debug("Finished connection succeeded activity.");
+ boolean isInterrupted = false;
+
+ final long timeoutMills = 30000;
+ // We want to wait for at least 30 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure immediately.
+ final long targetMillis = System.currentTimeMillis() + timeoutMills;
+
+ while(true) {
+ try {
+ // logger.debug("Waiting for connection.");
+ CONNECTION_TYPE connection = this.get(timeoutMills, TimeUnit.MILLISECONDS);
+
+ if (connection == null) {
+ // logger.debug("Connection failed.");
+ } else {
+ // logger.debug("Connection received. {}", connection);
+ cmd.connectionSucceeded(connection);
+ // logger.debug("Finished connection succeeded activity.");
+ }
+ break;
+ } catch (final InterruptedException interruptEx) {
+ // Ignore the interrupt and continue to wait until targetMillis has elapsed.
+ isInterrupted = true;
+ final long wait = targetMillis - System.currentTimeMillis();
+ if (wait < 1) {
+ cmd.connectionFailed(FailureType.CONNECTION, interruptEx);
+ break;
+ }
+ } catch (final ExecutionException | TimeoutException ex) {
+ cmd.connectionFailed(FailureType.CONNECTION, ex);
}
- } catch (final InterruptedException e) {
- cmd.connectionFailed(FailureType.CONNECTION, e);
+ }
+ if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- throw new IllegalStateException();
}
-
}
@Override
@@ -243,7 +261,6 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
delegate.connectionFailed(type, t);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 2ee9263..199569c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -71,7 +71,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
writeManager.waitForWritable();
return true;
}catch(final InterruptedException e){
- listener.failed(new RpcException(e));
+ listener.interrupted(e);
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
@@ -135,7 +135,6 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
if (channel.isActive()) {
channel.close().get();
}
- channel.close().get();
} catch (final InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 7d7c860..4485cf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -21,9 +21,19 @@ import io.netty.buffer.ByteBuf;
public interface RpcOutcomeListener<V> {
- public void failed(RpcException ex);
- public void success(V value, ByteBuf buffer);
+ /**
+ * Called when an error occurred while waiting for the RPC outcome.
+ * @param ex
+ */
+ void failed(RpcException ex);
+ void success(V value, ByteBuf buffer);
+
+ /**
+ * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
+ * failures.
+ */
+ void interrupted(final InterruptedException e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index ed31bed..143020f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -31,6 +31,8 @@ import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class DataTunnel {
@@ -39,17 +41,49 @@ public class DataTunnel {
private final DataConnectionManager manager;
private final Semaphore sendingSemaphore = new Semaphore(3);
+ // Needed for injecting a test pause
+ private boolean isInjectionControlSet;
+ private ExecutionControlsInjector testInjector;
+ private ExecutionControls testControls;
+ private org.slf4j.Logger testLogger;
+
+
public DataTunnel(DataConnectionManager manager) {
this.manager = manager;
}
+ /**
+ * Once a DataTunnel is created, clients of DataTunnel can pass injection controls to enable setting injections at
+ * pre-defined places. Currently following injection sites are available.
+ *
+ * 1. In method {@link #sendRecordBatch(RpcOutcomeListener, FragmentWritableBatch)}, an interruptible pause injection
+ * is available before acquiring the sending slot. Site name is: "data-tunnel-send-batch-wait-for-interrupt"
+ *
+ * @param testInjector
+ * @param testControls
+ * @param testLogger
+ */
+ public void setTestInjectionControls(final ExecutionControlsInjector testInjector,
+ final ExecutionControls testControls, final org.slf4j.Logger testLogger) {
+ isInjectionControlSet = true;
+ this.testInjector = testInjector;
+ this.testControls = testControls;
+ this.testLogger = testLogger;
+ }
+
public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
try{
+ if (isInjectionControlSet) {
+ // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for
+ // semaphore acquire. We expect the
+ testInjector.injectInterruptiblePause(testControls, "data-tunnel-send-batch-wait-for-interrupt", testLogger);
+ }
+
sendingSemaphore.acquire();
manager.runCommand(b);
}catch(final InterruptedException e){
- outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
+ outcomeListener.interrupted(e);
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
@@ -57,6 +91,7 @@ public class DataTunnel {
}
}
+ // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
try{
@@ -93,6 +128,11 @@ public class DataTunnel {
inner.success(value, buffer);
}
+ @Override
+ public void interrupted(InterruptedException e) {
+ sendingSemaphore.release();
+ inner.interrupted(e);
+ }
}
private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 143d104..8443948 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -349,9 +349,21 @@ public class QueryResultHandler {
throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
}
}
-
}
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for query results from Drillbit", ex);
+
+ if (!isTerminal.compareAndSet(false, true)) {
+ return;
+ }
+
+ closeFuture.removeListener(closeListener);
+
+ // Throw an interrupted UserException?
+ resultsListener.submissionFailed(UserException.systemError(ex).build());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index bb13d1f..bf4221e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -42,6 +42,11 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
final Logger logger) {
}
+ @Override
+ public void injectInterruptiblePause(ExecutionControls executionControls, String desc, Logger logger)
+ throws InterruptedException {
+ }
+
/**
* When assertions are not enabled, this count down latch that does nothing is injected.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bf62ccb..af3309f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -610,6 +610,18 @@ public class Foreman implements Runnable {
}
/**
+ * Ignore the current status and force the given failure as current status.
+ * NOTE: Used only for testing purposes. Shouldn't be used in production.
+ */
+ public void setForceFailure(final Exception exception) {
+ Preconditions.checkArgument(exception != null);
+ Preconditions.checkState(!isClosed);
+
+ resultState = QueryState.FAILED;
+ resultException = exception;
+ }
+
+ /**
* Add an exception to the result. All exceptions after the first become suppressed
* exceptions hanging off the first.
*
@@ -837,6 +849,14 @@ public class Foreman implements Runnable {
if ((newState == QueryState.CANCELED)
|| (newState == QueryState.COMPLETED)
|| (newState == QueryState.FAILED)) {
+
+ if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+ if (newState == QueryState.FAILED) {
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ foremanResult.setForceFailure(exception);
+ }
+ }
/*
* These amount to a completion of the cancellation requests' cleanup;
* now we can clean up and send the result.
@@ -1109,6 +1129,15 @@ public class Foreman implements Runnable {
stateListener.moveToState(QueryState.FAILED, ex);
}
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
+ // Consider the interrupt as failure.
+ final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+ logger.error(errMsg, e);
+ failed(new RpcException(errMsg, e));
+ }
}
/**
@@ -1139,10 +1168,15 @@ public class Foreman implements Runnable {
private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
public void failed(final RpcException ex) {
- logger.info(
- "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
- ex);
+ logger.info("Failure while trying communicate query result to initiating client. " +
+ "This would happen if a client is disconnected before response notice can be sent.", ex);
stateListener.moveToState(QueryState.FAILED, ex);
}
+
+ @Override
+ public void interrupted(final InterruptedException e) {
+ logger.warn("Interrupted while waiting for RPC outcome of sending final query result to initiating client.");
+ stateListener.moveToState(QueryState.FAILED, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 84a38a6..eed4e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -267,6 +267,12 @@ public class QueryManager {
ack);
}
}
+
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
+ "Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
+ }
}
QueryState updateEphemeralState(final QueryState queryState) {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2712fe7..6b44ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -145,6 +145,7 @@ public class FragmentExecutor implements Runnable {
*/
final Thread myThread = myThreadRef.get();
if (myThread != null) {
+ logger.debug("Interrupting fragment thread {}", myThread.getName());
myThread.interrupt();
}
}
@@ -343,6 +344,8 @@ public class FragmentExecutor implements Runnable {
case FINISHED:
if(current == FragmentState.CANCELLATION_REQUESTED){
target = FragmentState.CANCELLED;
+ } else if (current == FragmentState.FAILED) {
+ target = FragmentState.FAILED;
}
// fall-through
case FAILED:
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index d98b97a..b1e3fde 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -159,5 +159,8 @@ drill.exec: {
initial: 20000000
}
},
- debug.error_on_leak: true
+ debug: {
+ error_on_leak: true,
+ return_error_for_failure_in_cancelled_fragments: false
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index 7fcf4cb..a5db81d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Throwables.propagate;
import java.io.File;
import java.io.IOException;
+import java.util.Properties;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.util.MiniZooKeeperCluster;
@@ -45,7 +46,11 @@ public class ZookeeperHelper {
* <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
*/
public ZookeeperHelper() {
- config = DrillConfig.create();
+ final Properties overrideProps = new Properties();
+ // Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
+ // Setting this causes unittests to fail.
+ // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ config = DrillConfig.create(overrideProps);
zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
if (!testDir.exists()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 3749716..72e6c44 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -148,6 +148,10 @@ public class TestBitRpc extends ExecTest {
}
}
+ @Override
+ public void interrupted(final InterruptedException e) {
+ // TODO(We don't have any interrupts in test code)
+ }
}
private class BitComTestHandler implements DataResponseHandler {
http://git-wip-us.apache.org/repos/asf/drill/blob/36163105/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index d72d498..3efb629 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
@@ -780,4 +781,11 @@ public class TestDrillbitResilience {
Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
}
}
+
+ @Test
+ public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+ final String control =
+ createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+ assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ }
}