You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/10 22:45:08 UTC
[3/3] drill git commit: DRILL-2755: Use and handle
InterruptedException during query processing.
DRILL-2755: Use and handle InterruptedException during query processing.
- Interrupt FragmentExecutor thread as part of FragmentExecutor.cancel()
- Handle InterruptedException in ExternalSortBatch.newSV2(). If the fragment status says
should not continue, then throw the InterruptedException to caller which returns IterOutcome.STOP
- Add comments reg not handling of InterruptedException in SendingAccountor.waitForSendComplete()
- Handle InterruptedException in OrderedPartitionRecordBatch.getPartitionVectors()
If interrupted in Thread.sleep calls and fragment status says should not run, then
return IterOutcome.STOP downstream.
- Interrupt partitioner threads if PartitionerRecordBatch is interrupted while waiting for
partitioner threads to complete.
- Preserve interrupt status if not handled
- Handle null RecordBatches returned by RawBatchBuffer.getNext() in MergingRecordBatch.buildSchema()
- Change timeout in Foreman to be proportional to the number of intermediate fragments sent instead
of hard coded limit of 90s.
- Change TimedRunnable to enforce a timeout of 15s per runnable.
Total timeout is (5s * numOfRunnableTasks) / parallelism.
- Add unit tests
* Testing cancelling a query interrupts the query fragments which are currently blocked
* Testing interrupting the partitioner sender which in turn interrupts its helper threads
* Testing TimedRunanble enforeces timeout for the whole task list.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3a294abc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3a294abc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3a294abc
Branch: refs/heads/master
Commit: 3a294abcc51148e0e79096af5e6d3c45b7c19a9d
Parents: f8e5e61
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 6 09:34:25 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun May 10 12:24:45 2015 -0700
----------------------------------------------------------------------
.../drill/common/concurrent/ExtendedLatch.java | 21 +---
.../apache/drill/exec/ops/SendingAccountor.java | 13 +++
.../impl/mergereceiver/MergingRecordBatch.java | 19 +++
.../OrderedPartitionRecordBatch.java | 31 ++++-
.../partitionsender/PartitionerDecorator.java | 70 +++++++++--
.../partitionsender/PartitionerTemplate.java | 3 +-
.../UnorderedReceiverBatch.java | 22 +++-
.../physical/impl/xsort/ExternalSortBatch.java | 10 +-
.../exec/record/RawFragmentBatchProvider.java | 2 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 9 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 7 +-
.../drill/exec/rpc/ReconnectingConnection.java | 7 +-
.../apache/drill/exec/rpc/RemoteConnection.java | 18 ++-
.../apache/drill/exec/rpc/data/DataTunnel.java | 14 ++-
.../org/apache/drill/exec/server/Drillbit.java | 4 +
.../apache/drill/exec/store/TimedRunnable.java | 52 +++++++--
.../exec/testing/CountDownLatchInjection.java | 5 +
.../testing/CountDownLatchInjectionImpl.java | 5 +
.../exec/testing/ExecutionControlsInjector.java | 28 +++++
.../exec/testing/NoOpControlsInjector.java | 4 +
.../drill/exec/testing/PauseInjection.java | 7 ++
.../org/apache/drill/exec/work/WorkManager.java | 24 ++--
.../exec/work/batch/SpoolingRawBatchBuffer.java | 9 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 8 +-
.../apache/drill/exec/work/foreman/Foreman.java | 14 ++-
.../exec/work/fragment/FragmentExecutor.java | 14 +++
.../exec/server/TestDrillbitResilience.java | 117 ++++++++++++++++++-
.../drill/exec/store/TestTimedRunnable.java | 103 ++++++++++++++++
.../drill/jdbc/impl/DrillResultSetImpl.java | 4 +
29 files changed, 546 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index a75ac32..3e14f8a 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -35,16 +35,6 @@ public class ExtendedLatch extends CountDownLatch {
}
/**
- * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
- * state variable or similar.
- *
- * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
- */
- protected boolean ignoreInterruptions() {
- return true;
- }
-
- /**
* Await without interruption for a given time.
* @param waitMillis
* Time in milliseconds to wait
@@ -68,8 +58,7 @@ public class ExtendedLatch extends CountDownLatch {
}
/**
- * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
- * output of ignoreInterruptions();
+ * Await without interruption. In the case of interruption, log a warning and continue to wait.
*/
public void awaitUninterruptibly() {
while (true) {
@@ -77,12 +66,8 @@ public class ExtendedLatch extends CountDownLatch {
await();
return;
} catch (final InterruptedException e) {
- if (ignoreInterruptions()) {
- // if we're still not ready, the while loop will cause us to wait again
- logger.warn("Interrupted while waiting for event latch.", e);
- } else {
- return;
- }
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting for event latch.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index 0cb5fbf..5db2cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -42,13 +42,26 @@ class SendingAccountor {
public synchronized void waitForSendComplete() {
int waitForBatches = batchesSent.get();
+ boolean isInterrupted = false;
while(waitForBatches != 0) {
try {
wait.acquire(waitForBatches);
waitForBatches = batchesSent.addAndGet(-1 * waitForBatches);
} catch (InterruptedException e) {
+ // We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
+ // to send. This should be safe because: network connections should get disconnected and fail a send if a
+ // node goes down, otherwise, the receiving side connection should always consume from the rpc layer
+ // (blocking is cooperative and will get triggered before this)
logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);
+
+ isInterrupted = true;
}
}
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f19f371..5d990f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -70,6 +70,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
@@ -88,6 +89,7 @@ import com.sun.codemodel.JExpr;
*/
public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(MergingRecordBatch.class);
private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
@@ -141,6 +143,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try {
+ injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
final RawFragmentBatch b = provider.getNext();
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
@@ -148,6 +151,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
}
return b;
+ } catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
} finally {
stats.stopWait();
}
@@ -359,6 +368,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(node.batchId);
}
+
assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null && !context.shouldContinue()) {
@@ -461,6 +471,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return;
}
final RawFragmentBatch batch = getNext(i);
+ if (batch == null) {
+ if (!context.shouldContinue()) {
+ state = BatchState.STOP;
+ } else {
+ state = BatchState.DONE;
+ }
+
+ break;
+ }
if (batch.getHeader().getDef().getFieldCount() == 0) {
i++;
continue;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 63b7eba..ca6d83c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -246,6 +246,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
/**
+ * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
+ * @param timeout Timeout in milliseconds.
+ * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+ */
+ private boolean waitUntilTimeOut(final long timeout) {
+ while(true) {
+ try {
+ Thread.sleep(timeout);
+ return true;
+ } catch (final InterruptedException e) {
+ if (!context.shouldContinue()) {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
* This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
* that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
* distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
@@ -255,10 +273,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @return True is successful. False if failed.
*/
private boolean getPartitionVectors() {
-
-
try {
-
if (!saveSamples()) {
return false;
}
@@ -279,14 +294,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// TODO: this should be polling.
if (val < fragmentsBeforeProceed) {
- Thread.sleep(10);
+ if (!waitUntilTimeOut(10)) {
+ return false;
+ }
}
for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey);
if (finalTable != null) {
break;
}
- Thread.sleep(10);
+ if (!waitUntilTimeOut(10)) {
+ return false;
+ }
}
if (finalTable == null) {
buildTable();
@@ -302,7 +321,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
partitionVectors.add(w.getValueVector());
}
- } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
+ } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
kill(false);
context.fail(ex);
return false;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c3261dc..c355070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -28,6 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.testing.CountDownLatchInjection;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
/**
* Decorator class to hide multiple Partitioner existence from the caller
@@ -38,19 +41,22 @@ import com.google.common.collect.Lists;
* totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/
public class PartitionerDecorator {
-
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(PartitionerDecorator.class);
private List<Partitioner> partitioners;
private final OperatorStats stats;
private final String tName;
private final String childThreadPrefix;
private final ExecutorService executor;
+ private final FragmentContext context;
public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
this.partitioners = partitioners;
this.stats = stats;
+ this.context = context;
this.executor = context.getDrillbitContext().getExecutor();
this.tName = Thread.currentThread().getName();
this.childThreadPrefix = "Partitioner-" + tName + "-";
@@ -145,17 +151,42 @@ public class PartitionerDecorator {
stats.startWait();
final CountDownLatch latch = new CountDownLatch(partitioners.size());
final List<CustomRunnable> runnables = Lists.newArrayList();
+ final List<Future> taskFutures = Lists.newArrayList();
+ CountDownLatchInjection testCountDownLatch = null;
try {
- int i = 0;
- for (final Partitioner part : partitioners ) {
- runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part));
- executor.submit(runnables.get(i++));
+ // To simulate interruption of main fragment thread and interrupting the partition threads, create a
+ // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
+ // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
+ testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
+ testCountDownLatch.initialize(1);
+ for (final Partitioner part : partitioners) {
+ final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
+ runnables.add(runnable);
+ taskFutures.add(executor.submit(runnable));
}
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ while (true) {
+ try {
+ // Wait for main fragment interruption.
+ injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
+
+ // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
+ injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();
+
+ latch.await();
+ break;
+ } catch (final InterruptedException e) {
+ // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
+ if (!context.shouldContinue()) {
+ for(Future f : taskFutures) {
+ f.cancel(true);
+ }
+
+ break;
+ }
+ }
}
+
IOException excep = null;
for (final CustomRunnable runnable : runnables ) {
IOException myException = runnable.getException();
@@ -180,8 +211,12 @@ public class PartitionerDecorator {
// scale down main stats wait time based on calculated processing time
// since we did not wait for whole duration of above execution
stats.adjustWaitNanos(-maxProcessTime);
- }
+ // Done with the latch, close it.
+ if (testCountDownLatch != null) {
+ testCountDownLatch.close();
+ }
+ }
}
/**
@@ -190,7 +225,7 @@ public class PartitionerDecorator {
* protected is for testing purposes
*/
protected interface GeneralExecuteIface {
- public void execute(Partitioner partitioner) throws IOException;
+ void execute(Partitioner partitioner) throws IOException;
}
/**
@@ -242,17 +277,28 @@ public class PartitionerDecorator {
private final CountDownLatch latch;
private final GeneralExecuteIface iface;
private final Partitioner part;
+ private CountDownLatchInjection testCountDownLatch;
+
private volatile IOException exp;
- public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) {
+ public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
+ final Partitioner part, CountDownLatchInjection testCountDownLatch) {
this.parentThreadName = parentThreadName;
this.latch = latch;
this.iface = iface;
this.part = part;
+ this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
+ // Test only - Pause until interrupted by fragment thread
+ try {
+ testCountDownLatch.await();
+ } catch (final InterruptedException e) {
+ logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
+ }
+
final Thread currThread = Thread.currentThread();
final String currThreadName = currThread.getName();
final OperatorStats localStats = part.getStats();
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index cbea267..aeac01d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -285,7 +285,8 @@ public abstract class PartitionerTemplate implements Partitioner {
// to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
// sender has acknowledged the terminate request. After sending the last batch, all further batches are
// dropped.
- final boolean isLastBatch = isLast || terminated;
+ // 3. Partitioner thread is interrupted due to cancellation of fragment.
+ final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();
// if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
if (!isLastBatch && recordCount == 0) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 66a2092..e40fe54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -49,9 +49,12 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class UnorderedReceiverBatch implements CloseableRecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+ private final static ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(UnorderedReceiverBatch.class);
private final RecordBatchLoader batchLoader;
private final RawFragmentBatchProvider fragProvider;
@@ -133,6 +136,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
return batchLoader.getValueAccessorById(clazz, ids);
}
+ private RawFragmentBatch getNextBatch() throws IOException {
+ try {
+ injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
+ return fragProvider.getNext();
+ } catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
+ }
+ }
+
@Override
public IterOutcome next() {
stats.startProcessing();
@@ -140,11 +156,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
RawFragmentBatch batch;
try {
stats.startWait();
- batch = fragProvider.getNext();
+ batch = getNextBatch();
// skip over empty batches. we do this since these are basically control messages.
while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
- batch = fragProvider.getNext();
+ batch = getNextBatch();
}
} finally {
stats.stopWait();
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index aab3391..3159811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -277,6 +277,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} else {
try {
sv2 = newSV2();
+ } catch(InterruptedException e) {
+ return IterOutcome.STOP;
} catch (OutOfMemoryException e) {
throw new OutOfMemoryRuntimeException(e);
}
@@ -496,7 +498,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return size;
}
- private SelectionVector2 newSV2() throws OutOfMemoryException {
+ private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
if (!sv2.allocateNew(incoming.getRecordCount())) {
try {
@@ -509,8 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
while (true) {
try {
Thread.sleep(waitTime * 1000);
- } catch (InterruptedException e) {
- throw new OutOfMemoryException(e);
+ } catch(final InterruptedException e) {
+ if (!context.shouldContinue()) {
+ throw e;
+ }
}
waitTime *= 2;
if (sv2.allocateNew(incoming.getRecordCount())) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index d4dfe96..030785c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.ops.FragmentContext;
public interface RawFragmentBatchProvider {
- public RawFragmentBatch getNext() throws IOException;
+ public RawFragmentBatch getNext() throws IOException, InterruptedException;
public void kill(FragmentContext context);
public void cleanup();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 1661f81..d551173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -263,9 +263,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
logger.debug("Closing client");
try {
connection.getChannel().close().get();
- } catch (InterruptedException | ExecutionException e) {
- logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
- // TODO InterruptedException
+ } catch (final InterruptedException | ExecutionException e) {
+ logger.warn("Failure while shutting {}", this.getClass().getName(), e);
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a148436..6a7bc65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -204,9 +204,12 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
public void close() throws IOException {
try {
eventLoopGroup.shutdownGracefully().get();
- } catch (InterruptedException | ExecutionException e) {
+ } catch (final InterruptedException | ExecutionException e) {
logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index 9948d3e..f0787a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -112,9 +112,12 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
cmd.connectionSucceeded(connection);
// logger.debug("Finished connection succeeded activity.");
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
cmd.connectionFailed(FailureType.CONNECTION, e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0f095c0..2ee9263 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -70,9 +70,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
try{
writeManager.waitForWritable();
return true;
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
listener.failed(new RpcException(e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
return false;
}
}
@@ -131,10 +135,14 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
if (channel.isActive()) {
channel.close().get();
}
- } catch (InterruptedException | ExecutionException e) {
+ channel.close().get();
+ } catch (final InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 11f5496..ed31bed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -48,9 +48,12 @@ public class DataTunnel {
try{
sendingSemaphore.acquire();
manager.runCommand(b);
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
@@ -59,9 +62,12 @@ public class DataTunnel {
try{
sendingSemaphore.acquire();
manager.runCommand(b);
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
return b.getFuture();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index e7a9a3c..531253e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -271,6 +271,10 @@ public class Drillbit implements AutoCloseable {
Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
} catch (final InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
if (embeddedJetty != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 0fb778b..e52820b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.store;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
import com.google.common.base.Stopwatch;
@@ -36,6 +41,8 @@ import com.google.common.collect.Lists;
*/
public abstract class TimedRunnable<V> implements Runnable {
+ private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+
private volatile Exception e;
private volatile long timeNanos;
private volatile V value;
@@ -91,10 +98,13 @@ public abstract class TimedRunnable<V> implements Runnable {
}
/**
- * Execute the list of runnables with the given parallelization. At end, return values and report completion time stats to provided logger.
+ * Execute the list of runnables with the given parallelization. At end, return values and report completion time
+ * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
+ * tasks will be cancelled and a {@link UserException} is thrown.
* @param activity Name of activity for reporting in logger.
* @param logger The logger to use to report results.
- * @param runnables List of runnables that should be executed and timed. If this list has one item, task will be completed in-thread.
+ * @param runnables List of runnables that should be executed and timed. If this list has one item, task will be
+ * completed in-thread. Runnable must handle {@link InterruptedException}s.
* @param parallelism The number of threads that should be run to complete this task.
* @return The list of outcome objects.
* @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
@@ -107,25 +117,43 @@ public abstract class TimedRunnable<V> implements Runnable {
runnables.get(0).run();
}else{
parallelism = Math.min(parallelism, runnables.size());
- final CountDownLatch latch = new CountDownLatch(runnables.size());
+ final ExtendedLatch latch = new ExtendedLatch(runnables.size());
final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
try{
for(TimedRunnable<V> runnable : runnables){
threadPool.submit(new LatchedRunnable(latch, runnable));
}
- }finally{
- threadPool.shutdown();
- }
- try{
- latch.await();
- }catch(InterruptedException e){
- // TODO interrupted exception.
- throw new RuntimeException(e);
+ final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
+ if (!latch.awaitUninterruptibly(timeout)) {
+ // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
+ // It is highly important that the task Runnables are handling interrupts correctly.
+ threadPool.shutdownNow();
+
+ try {
+ // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+ // any running threads. If the runnables are handling the interrupts properly they should be able to
+ // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+ // thread leaks.
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
+ }
+
+ final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
+ "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
+ logger.error(errMsg);
+ throw UserException.resourceError()
+ .message(errMsg)
+ .build();
+ }
+ } finally {
+ if (!threadPool.isShutdown()) {
+ threadPool.shutdown();
+ }
}
}
-
List<V> values = Lists.newArrayList();
long sum = 0;
long max = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index de4a181..d26e2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -48,4 +48,9 @@ public interface CountDownLatchInjection {
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
*/
void countDown();
+
+ /**
+ * Close the latch.
+ */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index f4012c1..561d816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -82,4 +82,9 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
latch.countDown();
}
+
+ @Override
+ public void close() {
+ latch = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 05f8433..387d300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -124,6 +124,34 @@ public class ExecutionControlsInjector {
}
}
+ /**
+ * Insert a pause that can be interrupted using {@link Thread#interrupt()} at the given site point, if such an
+ * injection is specified (i.e. matches the site description).
+ * <p/>
+ * <p>Implementors use this in their code at a site where they want to simulate a interruptible pause
+ * during testing.
+ *
+ * @param executionControls the controls in the current context
+ * @param desc the site description
+ * @param logger logger of the class containing the injection site
+ * @throws InterruptedException if interrupted using {@link Thread#interrupt()}
+ */
+ public void injectInterruptiblePause(final ExecutionControls executionControls, final String desc,
+ final Logger logger) throws InterruptedException {
+ final PauseInjection pauseInjection = executionControls.lookupPauseInjection(this, desc);
+
+ if (pauseInjection != null) {
+ logger.debug("Interruptible pausing at {}", desc);
+ try {
+ pauseInjection.interruptiblePause();
+ } catch (final InterruptedException e) {
+ logger.debug("Pause interrupted at {}", desc);
+ throw e;
+ }
+ logger.debug("Interruptible pause resuming at {}", desc);
+ }
+ }
+
public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
return executionControls.lookupCountDownLatchInjection(this, desc);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 33ab783..bb13d1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -61,6 +61,10 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
@Override
public void countDown() {
}
+
+ @Override
+ public void close() {
+ }
};
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index ff0340b..fc4d8ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -54,6 +54,13 @@ public class PauseInjection extends Injection {
latch.awaitUninterruptibly();
}
+ public void interruptiblePause() throws InterruptedException {
+ if (!injectNow()) {
+ return;
+ }
+ latch.await();
+ }
+
public void unpause() {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index f2352e6..1d3a0b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,13 +21,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.SelfCleaningRunnable;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -173,6 +173,10 @@ public class WorkManager implements AutoCloseable {
}
} catch (final InterruptedException e) {
logger.warn("Executor interrupted while awaiting termination");
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
@@ -180,7 +184,7 @@ public class WorkManager implements AutoCloseable {
return dContext;
}
- private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+ private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running
/**
* Waits until it is safe to exit. Blocks until all currently running fragments have completed.
@@ -193,17 +197,11 @@ public class WorkManager implements AutoCloseable {
return;
}
- exitLatch = new CountDownLatch(1);
+ exitLatch = new ExtendedLatch();
}
- while(true) {
- try {
- exitLatch.await(5, TimeUnit.SECONDS);
- } catch(final InterruptedException e) {
- // keep waiting
- }
- break;
- }
+ // Wait for at most 5 seconds or until the latch is released.
+ exitLatch.awaitUninterruptibly(5000);
}
/**
@@ -328,6 +326,10 @@ public class WorkManager implements AutoCloseable {
try {
Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
} catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
// exit status thread on interrupt.
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 2a79e42..07a3505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -141,7 +141,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
}
@Override
- public RawFragmentBatch getNext() throws IOException {
+ public RawFragmentBatch getNext() throws IOException, InterruptedException {
if (outOfMemory && buffer.size() < 10) {
outOfMemory = false;
fragmentManager.setAutoRead(true);
@@ -160,9 +160,12 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
}
queueSize -= w.getBodySize();
return batch;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
return null;
- // TODO InterruptedException
}
}
if (w == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index d23655c..4750666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
private static enum BufferState {
INIT,
@@ -149,7 +149,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
@Override
- public RawFragmentBatch getNext() {
+ public RawFragmentBatch getNext() throws IOException, InterruptedException {
if (outOfMemory.get() && buffer.size() < 10) {
logger.trace("Setting autoread true");
@@ -166,8 +166,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
try {
b = buffer.take();
} catch (final InterruptedException e) {
- return null;
- // TODO InterruptedException
+ logger.debug("Interrupted while waiting for incoming data.", e);
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0122ef8..bf62ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
private static final ObjectMapper MAPPER = new ObjectMapper();
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
- private static final int RPC_WAIT_IN_SECONDS = 90;
+ private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
private final QueryId queryId;
private final RunQuery queryRequest;
@@ -967,7 +967,8 @@ public class Foreman implements Runnable {
* count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
* know if any submissions did fail.
*/
- final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
+ final int numIntFragments = intFragmentMap.keySet().size();
+ final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
@@ -975,16 +976,17 @@ public class Foreman implements Runnable {
sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
- if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+ final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+ if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
long numberRemaining = endpointLatch.getCount();
throw UserException.connectionError()
.message(
- "Exceeded timeout while waiting send intermediate work fragments to remote nodes. Sent %d and only heard response back from %d nodes.",
- intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+ "Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+ "Sent %d and only heard response back from %d nodes.",
+ timeout, numIntFragments, numIntFragments - numberRemaining)
.build();
}
-
// if any of the intermediate fragment submissions failed, fail the query
final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
if (submissionExceptions.size() > 0) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 24e2556..d96e6d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -62,6 +62,9 @@ public class FragmentExecutor implements Runnable {
private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
+ // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
+ private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
+
public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
final StatusReporter listener) {
this.fragmentContext = context;
@@ -136,6 +139,14 @@ public class FragmentExecutor implements Runnable {
* We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
*/
updateState(FragmentState.CANCELLATION_REQUESTED);
+
+ /*
+ * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+ */
+ final Thread myThread = myThreadRef.get();
+ if (myThread != null) {
+ myThread.interrupt();
+ }
}
}
@@ -168,6 +179,7 @@ public class FragmentExecutor implements Runnable {
@Override
public void run() {
final Thread myThread = Thread.currentThread();
+ myThreadRef.set(myThread);
final String originalThreadName = myThread.getName();
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
@@ -244,6 +256,8 @@ public class FragmentExecutor implements Runnable {
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
myThread.setName(originalThreadName);
+
+ myThreadRef.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 3e4dcb2..d72d498 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.server;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -43,6 +47,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -437,6 +445,20 @@ public class TestDrillbitResilience {
}
}
+ private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ (new CancellingThread(queryId, ex, null)).start();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ };
+
/**
* Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
*/
@@ -459,7 +481,9 @@ public class TestDrillbitResilience {
} catch (final RpcException ex) {
this.ex.value = ex;
}
- latch.countDown();
+ if (latch != null) {
+ latch.countDown();
+ }
}
}
@@ -507,13 +531,33 @@ public class TestDrillbitResilience {
* Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
*/
private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+ assertCancelled(controls, TEST_QUERY, listener);
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
+ */
+ private static void assertCancelled(final String controls, final String testQuery,
+ final WaitUntilCompleteListener listener) {
setControls(controls);
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, testQuery, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.CANCELED);
}
+ private static void setSessionOption(final String option, final String value) {
+ try {
+ final List<QueryDataBatch> results = drillClient.runQuery(QueryType.SQL,
+ String.format("alter session set `%s` = %s", option, value));
+ for (final QueryDataBatch data : results) {
+ data.release();
+ }
+ } catch(RpcException e) {
+ fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString()));
+ }
+ }
+
private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
return "{\"injections\" : [{"
+ "\"type\" : \"pause\"," +
@@ -667,4 +711,73 @@ public class TestDrillbitResilience {
final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
}
+
+ /**
+ * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+ * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
+ */
+ @Test
+ public void testInterruptingBlockedMergingRecordBatch() {
+ final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
+ testInterruptingBlockedFragmentsWaitingForData(control);
+ }
+
+ /**
+ * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+ * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
+ */
+ @Test
+ public void testInterruptingBlockedUnorderedReceiverBatch() {
+ final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
+ testInterruptingBlockedFragmentsWaitingForData(control);
+ }
+
+ private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
+ try {
+ setSessionOption(SLICE_TARGET, "1");
+ setSessionOption(HASHAGG.getOptionName(), "false");
+
+ final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+ assertCancelled(control, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ } finally {
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+ }
+ }
+
+ /**
+ * Tests interrupting the fragment thread that is running {@link PartitionSenderRootExec}.
+ * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
+ * the partitioner threads.
+ */
+ @Test
+ public void testInterruptingPartitionerThreadFragment() {
+ try {
+ setSessionOption(SLICE_TARGET, "1");
+ setSessionOption(HASHAGG.getOptionName(), "true");
+ setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+
+ final String controls = "{\"injections\" : ["
+ + "{"
+ + "\"type\" : \"latch\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"partitioner-sender-latch\""
+ + "},"
+ + "{"
+ + "\"type\" : \"pause\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"wait-for-fragment-interrupt\","
+ + "\"nSkip\" : 1"
+ + "}" +
+ "]}";
+
+ final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+ assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ } finally {
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+ setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(),
+ Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
new file mode 100644
index 0000000..2807c35
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.test.DrillTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit testing for {@link TimedRunnable}.
+ */
+public class TestTimedRunnable extends DrillTest {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+
+ @Rule
+ public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
+
+ private static class TestTask extends TimedRunnable {
+ final long sleepTime; // sleep time in ms
+
+ public TestTask(final long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ protected Void runInner() throws Exception {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ return null;
+ }
+
+ @Override
+ protected IOException convertToIOException(Exception e) {
+ return new IOException("Failure while trying to sleep for sometime", e);
+ }
+ }
+
+ @Test
+ public void withoutAnyTasksTriggeringTimeout() throws Exception {
+ List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+ for(int i=0; i<100; i++){
+ tasks.add(new TestTask(2000));
+ }
+
+ TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16);
+ }
+
+ @Test
+ public void withTasksExceedingTimeout() throws Exception {
+ UserException ex = null;
+
+ try {
+ List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+ for (int i = 0; i < 100; i++) {
+ if ((i & (i + 1)) == 0) {
+ tasks.add(new TestTask(2000));
+ } else {
+ tasks.add(new TestTask(20000));
+ }
+ }
+
+ TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+ } catch (UserException e) {
+ ex = e;
+ }
+
+ assertNotNull("Expected a UserException", ex);
+ assertThat(ex.getMessage(),
+ containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
+ "complete. Total runnable size 100, parallelism 16."));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 8ef2af3..2fe6c28 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -156,6 +156,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// calling some wait method?
resultsListener.latch.await();
} catch ( InterruptedException e ) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
throw new SQLException( "Interrupted", e );