You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/05/10 22:45:06 UTC
[1/3] drill git commit: DRILL-2697: Pauses sites wait indefinitely
for a resume signal DrillClient sends a resume signal to UserServer.
UserServer triggers a resume call in the correct Foreman. Foreman resumes all
pauses related to the query through the
Repository: drill
Updated Branches:
refs/heads/master 4e596334e -> 3a294abcc
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index da69e9e..3e4dcb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -25,12 +25,13 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import com.google.common.base.Preconditions;
import org.apache.commons.math3.util.Pair;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -60,7 +61,9 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.store.pojo.PojoRecordReader;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -68,15 +71,14 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
/**
* Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
+ * execution by injecting exceptions at various points and to cancellations in various phases.
+ * The test cases are mentioned in DRILL-2383.
*/
-@Ignore
public class TestDrillbitResilience {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
@@ -90,7 +92,6 @@ public class TestDrillbitResilience {
* counting sys.drillbits.
*/
private static final String TEST_QUERY = "select * from sys.memory";
- private static final long PAUSE_TIME_MILLIS = 3000L;
private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
if (drillbits.containsKey(name)) {
@@ -184,18 +185,17 @@ public class TestDrillbitResilience {
}
/**
- * Clear all exceptions.
+ * Clear all injections.
*/
private static void clearAllInjections() {
- assertTrue(drillClient != null);
+ Preconditions.checkNotNull(drillClient);
ControlsInjectionUtil.clearControls(drillClient);
}
/**
* Check that all the drillbits are ok.
* <p/>
- * <p>The current implementation does this by counting the number of drillbits using a
- * query.
+ * <p>The current implementation does this by counting the number of drillbits using a query.
*/
private static void assertDrillbitsOk() {
final SingleRowListener listener = new SingleRowListener() {
@@ -245,13 +245,14 @@ public class TestDrillbitResilience {
try {
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
listener.waitForCompletion();
- assertTrue(listener.getQueryState() == QueryState.COMPLETED);
+ final QueryState state = listener.getQueryState();
+ assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED);
} catch (final Exception e) {
throw new RuntimeException("Couldn't query active drillbits", e);
}
final List<DrillPBError> errorList = listener.getErrorList();
- assertTrue(errorList.isEmpty());
+ assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
}
@SuppressWarnings("static-method")
@@ -262,10 +263,10 @@ public class TestDrillbitResilience {
}
/**
- * Set the given exceptions.
+ * Set the given controls.
*/
- private static void setExceptions(final String controlsString) {
- ControlsInjectionUtil.setControls(drillClient, controlsString);
+ private static void setControls(final String controls) {
+ ControlsInjectionUtil.setControls(drillClient, controls);
}
/**
@@ -332,16 +333,16 @@ public class TestDrillbitResilience {
*/
private static void assertExceptionInjected(final Throwable throwable,
final Class<? extends Throwable> exceptionClass, final String desc) {
- assertTrue(throwable instanceof UserException);
+ assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
- assertEquals(exceptionClass.getName(), cause.getExceptionClass());
- assertEquals(desc, cause.getMessage());
+ assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
+ assertEquals("Exception sites should match.", desc, cause.getMessage());
}
@Test
public void settingNoopInjectionsAndQuery() {
final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
- setExceptions(controls);
+ setControls(controls);
try {
QueryTestUtil.test(drillClient, TEST_QUERY);
} catch (final Exception e) {
@@ -357,7 +358,7 @@ public class TestDrillbitResilience {
*/
private static void testForeman(final String desc) {
final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
- setExceptions(controls);
+ setControls(controls);
try {
QueryTestUtil.test(drillClient, TEST_QUERY);
fail();
@@ -372,32 +373,39 @@ public class TestDrillbitResilience {
testForeman("run-try-beginning");
}
- /*
- * TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way
- * that it gates input via stateListener. That could be tricky, since some results could be
- * queued up before Foreman has gotten through it's run(), and they would all have to be sent
- * before the gate is opened. There's also the question of what to do in case we detect failure
- * there after some data has been sent. Right now, this test doesn't work because that's
- * exactly what happens, and the client believes that the query succeeded, even though an exception
- * was thrown after setup completed, but data was asynchronously sent to the client before that.
- * This test also revealed that the QueryState never seems to make it to the client, so we can't
- * detect the failure that way (see SingleRowListener's getQueryState(), which I originally tried
- * to use here to detect query completion).
- */
@SuppressWarnings("static-method")
@Test
public void foreman_runTryEnd() {
testForeman("run-try-end");
}
+ /**
+ * Tests can use this listener to wait, until the submitted query completes or fails, by
+ * calling #waitForCompletion.
+ */
private static class WaitUntilCompleteListener implements UserResultsListener {
- protected final CountDownLatch latch;
+ private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion
protected QueryId queryId = null;
- protected Exception ex = null;
- protected QueryState state = null;
+ protected volatile Pointer<Exception> ex = new Pointer<>();
+ protected volatile QueryState state = null;
+
+ /**
+ * Method that sets the exception if the condition is not met.
+ */
+ protected final void check(final boolean condition, final String format, final Object... args) {
+ if (!condition) {
+ ex.value = new IllegalStateException(String.format(format, args));
+ }
+ }
- public WaitUntilCompleteListener(final int count) {
- latch = new CountDownLatch(count);
+ /**
+ * Method that cancels and resumes the query, in order.
+ */
+ protected final void cancelAndResume() {
+ Preconditions.checkNotNull(queryId);
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ (new CancellingThread(queryId, ex, trigger)).start();
+ (new ResumingThread(queryId, ex, trigger)).start();
}
@Override
@@ -407,7 +415,7 @@ public class TestDrillbitResilience {
@Override
public void submissionFailed(final UserException ex) {
- this.ex = ex;
+ this.ex.value = ex;
state = QueryState.FAILED;
latch.countDown();
}
@@ -424,21 +432,23 @@ public class TestDrillbitResilience {
}
public final Pair<QueryState, Exception> waitForCompletion() {
- try {
- latch.await();
- } catch (final InterruptedException e) {
- return new Pair<QueryState, Exception>(state, e);
- }
- return new Pair<>(state, ex);
+ latch.awaitUninterruptibly();
+ return new Pair<>(state, ex.value);
}
}
+ /**
+ * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
+ */
private static class CancellingThread extends Thread {
-
private final QueryId queryId;
+ private final Pointer<Exception> ex;
+ private final ExtendedLatch latch;
- public CancellingThread(final QueryId queryId) {
+ public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
this.queryId = queryId;
+ this.ex = ex;
+ this.latch = latch;
}
@Override
@@ -446,139 +456,178 @@ public class TestDrillbitResilience {
final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
try {
cancelAck.checkedGet();
- } catch (final RpcException e) {
- fail(e.getMessage()); // currently this failure does not fail the test
+ } catch (final RpcException ex) {
+ this.ex.value = ex;
}
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then
+ * the thread waits without interruption.
+ */
+ private static class ResumingThread extends Thread {
+ private final QueryId queryId;
+ private final Pointer<Exception> ex;
+ private final ExtendedLatch latch;
+
+ public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) {
+ this.queryId = queryId;
+ this.ex = ex;
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ latch.awaitUninterruptibly();
+ final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId);
+ try {
+ resumeAck.checkedGet();
+ } catch (final RpcException ex) {
+ this.ex.value = ex;
+ }
+ }
+ }
+
+ /**
+ * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the state is not
+ * as expected or if an exception is thrown.
+ */
+ private static void assertCompleteState(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+ final QueryState actualState = result.getFirst();
+ final Exception exception = result.getSecond();
+ if (actualState != expectedState || exception != null) {
+ fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s",
+ expectedState, actualState, exception == null ? "none." : exception));
}
}
/**
* Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
*/
- private static void assertCancelled(final String controls, final WaitUntilCompleteListener listener) {
- ControlsInjectionUtil.setControls(drillClient, controls);
+ private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+ setControls(controls);
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
- assertTrue(String.format("Expected Query Outcome of CANCELED but had Outcome of %s", result.getFirst()),
- result.getFirst() == QueryState.CANCELED);
- assertTrue(String.format("Expected no Exception but had Exception %s", result.getSecond()),
- result.getSecond() == null);
+ assertCompleteState(result, QueryState.CANCELED);
}
- @Test // Cancellation TC 1
- public void cancelBeforeAnyResultsArrive() {
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
+ return "{\"injections\" : [{"
+ + "\"type\" : \"pause\"," +
+ "\"siteClass\" : \"" + siteClass.getName() + "\","
+ + "\"desc\" : \"" + siteDesc + "\","
+ + "\"nSkip\" : " + nSkip
+ + "}]}";
+ }
+
+ private static String createPauseInjection(final Class siteClass, final String siteDesc) {
+ return createPauseInjection(siteClass, siteDesc, 0);
+ }
+ @Test // To test pause and resume. Test hangs if resume did not happen.
+ public void passThrough() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@Override
public void queryIdArrived(final QueryId queryId) {
- (new CancellingThread(queryId)).start();
+ super.queryIdArrived(queryId);
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ (new ResumingThread(queryId, ex, trigger)).start();
+ trigger.countDown();
}
};
- final String controls = "{\"injections\":[{"
- + "\"type\":\"pause\"," +
- "\"siteClass\":\"" + Foreman.class.getName() + "\","
- + "\"desc\":\"pause-run-plan\","
- + "\"millis\":" + PAUSE_TIME_MILLIS + ","
- + "\"nSkip\":0,"
- + "\"nFire\":1"
- + "}]}";
+ final String controls = createPauseInjection(PojoRecordReader.class, "read-next");
+ setControls(controls);
+
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ assertCompleteState(result, QueryState.COMPLETED);
+ }
+
+ @Test // Cancellation TC 1: cancel before any result set is returned
+ public void cancelBeforeAnyResultsArrive() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
- assertCancelled(controls, listener);
+ @Override
+ public void queryIdArrived(final QueryId queryId) {
+ super.queryIdArrived(queryId);
+ cancelAndResume();
+ }
+ };
+
+ final String controls = createPauseInjection(Foreman.class, "foreman-ready");
+ assertCancelledWithoutException(controls, listener);
}
- @Test // Cancellation TC 2
+ @Test // Cancellation TC 2: cancel in the middle of fetching result set
public void cancelInMiddleOfFetchingResults() {
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private boolean cancelRequested = false;
@Override
- public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- if (! cancelRequested) {
- assertTrue(queryId != null);
- (new CancellingThread(queryId)).start();
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ cancelAndResume();
cancelRequested = true;
}
result.release();
}
};
- final String controls = "{\"injections\":[{"
- + "\"type\":\"pause\"," +
- "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
- + "\"desc\":\"sending-data\","
- + "\"millis\":" + PAUSE_TIME_MILLIS + ","
- + "\"nSkip\":0,"
- + "\"nFire\":1"
- + "}]}";
-
- assertCancelled(controls, listener);
+ // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
+ final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+ assertCancelledWithoutException(controls, listener);
}
- @Test // Cancellation TC 3
+ @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
public void cancelAfterAllResultsProduced() {
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@Override
- public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
if (++count == drillbits.size()) {
- assertTrue(queryId != null);
- (new CancellingThread(queryId)).start();
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ cancelAndResume();
}
result.release();
}
};
- final String controls = "{\"injections\":[{"
- + "\"type\":\"pause\"," +
- "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
- + "\"desc\":\"send-complete\","
- + "\"millis\":" + PAUSE_TIME_MILLIS + ","
- + "\"nSkip\":0,"
- + "\"nFire\":1"
- + "}]}";
-
- assertCancelled(controls, listener);
+ final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
+ assertCancelledWithoutException(controls, listener);
}
- @Test // Cancellation TC 4
+ @Test // Cancellation TC 4: cancel after everything is completed and fetched
public void cancelAfterEverythingIsCompleted() {
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@Override
- public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
if (++count == drillbits.size()) {
- assertTrue(queryId != null);
- (new CancellingThread(queryId)).start();
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ cancelAndResume();
}
result.release();
}
};
- final String controls = "{\"injections\":[{"
- + "\"type\":\"pause\"," +
- "\"siteClass\":\"" + Foreman.class.getName() + "\","
- + "\"desc\":\"foreman-cleanup\","
- + "\"millis\":" + PAUSE_TIME_MILLIS + ","
- + "\"nSkip\":0,"
- + "\"nFire\":1"
- + "}]}";
-
- assertCancelled(controls, listener);
+ final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
+ assertCancelledWithoutException(controls, listener);
}
- @Test // Completion TC 1
+ @Test // Completion TC 1: success
public void successfullyCompletes() {
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
- QueryTestUtil.testWithListener(
- drillClient, QueryType.SQL, TEST_QUERY, listener);
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
- assertTrue(result.getFirst() == QueryState.COMPLETED);
- assertTrue(result.getSecond() == null);
+ assertCompleteState(result, QueryState.COMPLETED);
}
/**
@@ -586,16 +635,16 @@ public class TestDrillbitResilience {
*/
private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
final String exceptionDesc) {
- setExceptions(controls);
- final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ setControls(controls);
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
- assertTrue(result.getFirst() == QueryState.FAILED);
- final Exception e = result.getSecond();
- assertExceptionInjected(e, exceptionClass, exceptionDesc);
+ final QueryState state = result.getFirst();
+ assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
+ assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
}
- @Test // Completion TC 2
+ @Test // Completion TC 2: failed query - before query is executed - while sql parsing
public void failsWhenParsing() {
final String exceptionDesc = "sql-parsing";
final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
@@ -603,7 +652,7 @@ public class TestDrillbitResilience {
assertFailsWithException(controls, exceptionClass, exceptionDesc);
}
- @Test // Completion TC 3
+ @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
public void failsWhenSendingFragments() {
final String exceptionDesc = "send-fragments";
final Class<? extends Throwable> exceptionClass = ForemanException.class;
@@ -611,7 +660,7 @@ public class TestDrillbitResilience {
assertFailsWithException(controls, exceptionClass, exceptionDesc);
}
- @Test // Completion TC 4
+ @Test // Completion TC 4: failed query - during query execution
public void failsDuringExecution() {
final String exceptionDesc = "fragment-execution";
final Class<? extends Throwable> exceptionClass = IOException.class;
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
new file mode 100644
index 0000000..c98f54c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.util.Pointer;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCountDownLatchInjection extends BaseTestQuery {
+
+ private static final UserSession session = UserSession.Builder.newBuilder()
+ .withCredentials(UserCredentials.newBuilder()
+ .setUserName("foo")
+ .build())
+ .withUserProperties(UserProperties.getDefaultInstance())
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
+
+ /**
+ * Class whose methods we want to simulate count down latches at run-time for testing
+ * purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
+ * {@link org.apache.drill.exec.ops.FragmentContext}.
+ */
+ private static class DummyClass {
+ private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+
+ private final QueryContext context;
+ private final CountDownLatch latch;
+ private final int count;
+
+ public DummyClass(final QueryContext context, final CountDownLatch latch, final int count) {
+ this.context = context;
+ this.latch = latch;
+ this.count = count;
+ }
+
+ public static final String LATCH_NAME = "<<latch>>";
+
+ /**
+ * Method that initializes and waits for "count" number of count down (from those many threads)
+ */
+ public long initAndWait() throws InterruptedException {
+ // ... code ...
+
+ injector.getLatch(context.getExecutionControls(), LATCH_NAME).initialize(count);
+
+ // ... code ...
+ latch.countDown(); // trigger threads spawn
+
+ final long startTime = System.currentTimeMillis();
+ // simulated wait for "count" threads to count down on the same latch
+ injector.getLatch(context.getExecutionControls(), LATCH_NAME).await();
+ final long endTime = System.currentTimeMillis();
+ // ... code ...
+ return (endTime - startTime);
+ }
+
+ public void countDown() {
+ // ... code ...
+ injector.getLatch(context.getExecutionControls(), LATCH_NAME).countDown();
+ // ... code ...
+ }
+ }
+
+ private static class ThreadCreator extends Thread {
+
+ private final DummyClass dummyClass;
+ private final ExtendedLatch latch;
+ private final int count;
+ private final Pointer<Long> countingDownTime;
+
+ public ThreadCreator(final DummyClass dummyClass, final ExtendedLatch latch, final int count,
+ final Pointer<Long> countingDownTime) {
+ this.dummyClass = dummyClass;
+ this.latch = latch;
+ this.count = count;
+ this.countingDownTime = countingDownTime;
+ }
+
+ @Override
+ public void run() {
+ latch.awaitUninterruptibly();
+ final long startTime = System.currentTimeMillis();
+ for (int i = 0; i < count; i++) {
+ (new Thread() {
+ @Override
+ public void run() {
+ dummyClass.countDown();
+ }
+ }).start();
+ }
+ final long endTime = System.currentTimeMillis();
+ countingDownTime.value = (endTime - startTime);
+ }
+ }
+
+ @Test // test would hang if the correct init, wait and countdowns did not happen, and the test timeout mechanism will
+ // catch that case
+ public void latchInjected() {
+ final int threads = 10;
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ final Pointer<Long> countingDownTime = new Pointer<>();
+
+ final String jsonString = "{\"injections\":[{"
+ + "\"type\":\"latch\"," +
+ "\"siteClass\":\"org.apache.drill.exec.testing.TestCountDownLatchInjection$DummyClass\","
+ + "\"desc\":\"" + DummyClass.LATCH_NAME + "\""
+ + "}]}";
+
+ ControlsInjectionUtil.setControls(session, jsonString);
+
+ final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+
+ final DummyClass dummyClass = new DummyClass(queryContext, trigger, threads);
+ (new ThreadCreator(dummyClass, trigger, threads, countingDownTime)).start();
+ final long timeSpentWaiting;
+ try {
+ timeSpentWaiting = dummyClass.initAndWait();
+ } catch (final InterruptedException e) {
+ fail("Thread should not be interrupted; there is no deliberate attempt.");
+ return;
+ }
+ assertTrue(timeSpentWaiting >= countingDownTime.value);
+ try {
+ queryContext.close();
+ } catch (final Exception e) {
+ fail("Failed to close query context: " + e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 5fa2b3f..ba29c58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -18,20 +18,33 @@
package org.apache.drill.exec.testing;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.Pointer;
import org.junit.Test;
import org.slf4j.Logger;
+import java.util.concurrent.CountDownLatch;
+
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestPauseInjection extends BaseTestQuery {
private static final UserSession session = UserSession.Builder.newBuilder()
- .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+ .withCredentials(UserCredentials.newBuilder()
+ .setUserName("foo")
+ .build())
.withUserProperties(UserProperties.getDefaultInstance())
.withOptionManager(bits[0].getContext().getOptionManager())
.build();
@@ -46,9 +59,11 @@ public class TestPauseInjection extends BaseTestQuery {
private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
private final QueryContext context;
+ private final CountDownLatch latch;
- public DummyClass(final QueryContext context) {
+ public DummyClass(final QueryContext context, final CountDownLatch latch) {
this.context = context;
+ this.latch = latch;
}
public static final String PAUSES = "<<pauses>>";
@@ -61,6 +76,7 @@ public class TestPauseInjection extends BaseTestQuery {
public long pauses() {
// ... code ...
+ latch.countDown();
final long startTime = System.currentTimeMillis();
// simulated pause
injector.injectPause(context.getExecutionControls(), PAUSES, logger);
@@ -71,30 +87,136 @@ public class TestPauseInjection extends BaseTestQuery {
}
}
+ private static class ResumingThread extends Thread {
+
+ private final QueryContext context;
+ private final ExtendedLatch latch;
+ private final Pointer<Exception> ex;
+ private final long millis;
+
+ public ResumingThread(final QueryContext context, final ExtendedLatch latch, final Pointer<Exception> ex,
+ final long millis) {
+ this.context = context;
+ this.latch = latch;
+ this.ex = ex;
+ this.millis = millis;
+ }
+
+ @Override
+ public void run() {
+ latch.awaitUninterruptibly();
+ try {
+ Thread.sleep(millis);
+ } catch (final InterruptedException ex) {
+ this.ex.value = ex;
+ }
+ context.getExecutionControls().unpauseAll();
+ }
+ }
+
@Test
public void pauseInjected() {
- final long pauseMillis = 1000L;
+ final long expectedDuration = 1000L;
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ final Pointer<Exception> ex = new Pointer<>();
+
final String jsonString = "{\"injections\":[{"
+ "\"type\":\"pause\"," +
"\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+ "\"desc\":\"" + DummyClass.PAUSES + "\","
- + "\"millis\":" + pauseMillis + ","
- + "\"nSkip\":0,"
- + "\"nFire\":1"
+ + "\"nSkip\":0"
+ "}]}";
ControlsInjectionUtil.setControls(session, jsonString);
final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+ (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
// test that the pause happens
- final DummyClass dummyClass = new DummyClass(queryContext);
- final long time = dummyClass.pauses();
- assertTrue((time >= pauseMillis));
+ final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+ final long actualDuration = dummyClass.pauses();
+ assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+ expectedDuration <= actualDuration);
+ assertTrue("No exception should be thrown.", ex.value == null);
try {
queryContext.close();
- } catch (Exception e) {
- fail();
+ } catch (final Exception e) {
+ fail("Failed to close query context: " + e);
+ }
+ }
+
+ @Test
+ public void pauseOnSpecificBit() {
+ final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+ final ZookeeperHelper zkHelper = new ZookeeperHelper();
+ zkHelper.startZookeeper(1);
+
+ // Creating two drillbits
+ final Drillbit drillbit1, drillbit2;
+ final DrillConfig drillConfig = zkHelper.getConfig();
+ try {
+ drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+ drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+ } catch (final DrillbitStartupException e) {
+ throw new RuntimeException("Failed to start two drillbits.", e);
+ }
+
+ final DrillbitContext drillbitContext1 = drillbit1.getContext();
+ final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+ final UserSession session = UserSession.Builder.newBuilder()
+ .withCredentials(UserCredentials.newBuilder()
+ .setUserName("foo")
+ .build())
+ .withUserProperties(UserProperties.getDefaultInstance())
+ .withOptionManager(drillbitContext1.getOptionManager())
+ .build();
+
+ final DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
+ final String jsonString = "{\"injections\":[{"
+ + "\"type\" : \"pause\"," +
+ "\"siteClass\" : \"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+ + "\"desc\" : \"" + DummyClass.PAUSES + "\","
+ + "\"nSkip\" : 0, "
+ + "\"address\" : \"" + drillbitEndpoint1.getAddress() + "\","
+ + "\"port\" : " + drillbitEndpoint1.getUserPort()
+ + "}]}";
+
+ ControlsInjectionUtil.setControls(session, jsonString);
+
+ {
+ final long expectedDuration = 1000L;
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ final Pointer<Exception> ex = new Pointer<>();
+ final QueryContext queryContext = new QueryContext(session, drillbitContext1);
+ (new ResumingThread(queryContext, trigger, ex, expectedDuration)).start();
+
+ // test that the pause happens
+ final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+ final long actualDuration = dummyClass.pauses();
+ assertTrue(String.format("Test should stop for at least %d milliseconds.", expectedDuration),
+ expectedDuration <= actualDuration);
+ assertTrue("No exception should be thrown.", ex.value == null);
+ try {
+ queryContext.close();
+ } catch (final Exception e) {
+ fail("Failed to close query context: " + e);
+ }
+ }
+
+ {
+ final ExtendedLatch trigger = new ExtendedLatch(1);
+ final QueryContext queryContext = new QueryContext(session, drillbitContext2);
+
+ // if the resume did not happen, the test would hang
+ final DummyClass dummyClass = new DummyClass(queryContext, trigger);
+ dummyClass.pauses();
+ try {
+ queryContext.close();
+ } catch (final Exception e) {
+ fail("Failed to close query context: " + e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
index 470e976..b428337 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
@@ -47,13 +47,13 @@ public final class BitControl {
*/
GOODBYE(2, 2),
/**
- * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+ * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
*
* <pre>
* bit requests
* </pre>
*/
- REQ_INIATILIZE_FRAGMENTS(3, 3),
+ REQ_INITIALIZE_FRAGMENTS(3, 3),
/**
* <code>REQ_CANCEL_FRAGMENT = 6;</code>
*
@@ -91,25 +91,33 @@ public final class BitControl {
*/
REQ_QUERY_CANCEL(9, 15),
/**
+ * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+ *
+ * <pre>
+ * send a resume message for a fragment, returns Ack
+ * </pre>
+ */
+ REQ_UNPAUSE_FRAGMENT(10, 16),
+ /**
* <code>RESP_FRAGMENT_HANDLE = 11;</code>
*
* <pre>
* bit responses
* </pre>
*/
- RESP_FRAGMENT_HANDLE(10, 11),
+ RESP_FRAGMENT_HANDLE(11, 11),
/**
* <code>RESP_FRAGMENT_STATUS = 12;</code>
*/
- RESP_FRAGMENT_STATUS(11, 12),
+ RESP_FRAGMENT_STATUS(12, 12),
/**
* <code>RESP_BIT_STATUS = 13;</code>
*/
- RESP_BIT_STATUS(12, 13),
+ RESP_BIT_STATUS(13, 13),
/**
* <code>RESP_QUERY_STATUS = 14;</code>
*/
- RESP_QUERY_STATUS(13, 14),
+ RESP_QUERY_STATUS(14, 14),
;
/**
@@ -125,13 +133,13 @@ public final class BitControl {
*/
public static final int GOODBYE_VALUE = 2;
/**
- * <code>REQ_INIATILIZE_FRAGMENTS = 3;</code>
+ * <code>REQ_INITIALIZE_FRAGMENTS = 3;</code>
*
* <pre>
* bit requests
* </pre>
*/
- public static final int REQ_INIATILIZE_FRAGMENTS_VALUE = 3;
+ public static final int REQ_INITIALIZE_FRAGMENTS_VALUE = 3;
/**
* <code>REQ_CANCEL_FRAGMENT = 6;</code>
*
@@ -169,6 +177,14 @@ public final class BitControl {
*/
public static final int REQ_QUERY_CANCEL_VALUE = 15;
/**
+ * <code>REQ_UNPAUSE_FRAGMENT = 16;</code>
+ *
+ * <pre>
+ * send a resume message for a fragment, returns Ack
+ * </pre>
+ */
+ public static final int REQ_UNPAUSE_FRAGMENT_VALUE = 16;
+ /**
* <code>RESP_FRAGMENT_HANDLE = 11;</code>
*
* <pre>
@@ -197,13 +213,14 @@ public final class BitControl {
case 0: return HANDSHAKE;
case 1: return ACK;
case 2: return GOODBYE;
- case 3: return REQ_INIATILIZE_FRAGMENTS;
+ case 3: return REQ_INITIALIZE_FRAGMENTS;
case 6: return REQ_CANCEL_FRAGMENT;
case 7: return REQ_RECEIVER_FINISHED;
case 8: return REQ_FRAGMENT_STATUS;
case 9: return REQ_BIT_STATUS;
case 10: return REQ_QUERY_STATUS;
case 15: return REQ_QUERY_CANCEL;
+ case 16: return REQ_UNPAUSE_FRAGMENT;
case 11: return RESP_FRAGMENT_HANDLE;
case 12: return RESP_FRAGMENT_STATUS;
case 13: return RESP_BIT_STATUS;
@@ -7395,16 +7412,17 @@ public final class BitControl {
"oint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_tim" +
"e\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030",
"\001 \001(\0132\030.exec.bit.FragmentHandle\022(\n\006sende" +
- "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\271\002\n\007Rp" +
+ "r\030\002 \001(\0132\030.exec.bit.FragmentHandle*\323\002\n\007Rp" +
"cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE" +
- "\020\002\022\034\n\030REQ_INIATILIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
+ "\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_" +
"CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISH" +
"ED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT" +
"_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_Q" +
- "UERY_CANCEL\020\017\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022" +
- "\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_ST" +
- "ATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016B+\n\033org.apa",
- "che.drill.exec.protoB\nBitControlH\001"
+ "UERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022" +
+ "\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP_FRAGME" +
+ "NT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022\025\n\021RESP",
+ "_QUERY_STATUS\020\016B+\n\033org.apache.drill.exec" +
+ ".protoB\nBitControlH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index c3ff58b..afe8bfe 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -67,25 +67,33 @@ public final class UserProtos {
*/
REQUEST_RESULTS(5, 5),
/**
+ * <code>RESUME_PAUSED_QUERY = 11;</code>
+ *
+ * <pre>
+ * user is sending a query resume request to the drillbit
+ * </pre>
+ */
+ RESUME_PAUSED_QUERY(6, 11),
+ /**
* <code>QUERY_DATA = 6;</code>
*
* <pre>
* bit to user
* </pre>
*/
- QUERY_DATA(6, 6),
+ QUERY_DATA(7, 6),
/**
* <code>QUERY_HANDLE = 7;</code>
*/
- QUERY_HANDLE(7, 7),
+ QUERY_HANDLE(8, 7),
/**
* <code>REQ_META_FUNCTIONS = 8;</code>
*/
- REQ_META_FUNCTIONS(8, 8),
+ REQ_META_FUNCTIONS(9, 8),
/**
* <code>RESP_FUNCTION_LIST = 9;</code>
*/
- RESP_FUNCTION_LIST(9, 9),
+ RESP_FUNCTION_LIST(10, 9),
/**
* <code>QUERY_RESULT = 10;</code>
*
@@ -93,7 +101,7 @@ public final class UserProtos {
* drillbit is reporting a query status change, most likely a terminal message, to the user
* </pre>
*/
- QUERY_RESULT(10, 10),
+ QUERY_RESULT(11, 10),
;
/**
@@ -129,6 +137,14 @@ public final class UserProtos {
*/
public static final int REQUEST_RESULTS_VALUE = 5;
/**
+ * <code>RESUME_PAUSED_QUERY = 11;</code>
+ *
+ * <pre>
+ * user is sending a query resume request to the drillbit
+ * </pre>
+ */
+ public static final int RESUME_PAUSED_QUERY_VALUE = 11;
+ /**
* <code>QUERY_DATA = 6;</code>
*
* <pre>
@@ -168,6 +184,7 @@ public final class UserProtos {
case 3: return RUN_QUERY;
case 4: return CANCEL_QUERY;
case 5: return REQUEST_RESULTS;
+ case 11: return RESUME_PAUSED_QUERY;
case 6: return QUERY_DATA;
case 7: return QUERY_HANDLE;
case 8: return REQ_META_FUNCTIONS;
@@ -4986,16 +5003,17 @@ public final class UserProtos {
"n\030\003 \001(\t\"|\n\022BitToUserHandshake\022\023\n\013rpc_ver" +
"sion\030\002 \001(\005\022*\n\006status\030\003 \001(\0162\032.exec.user.H" +
"andshakeStatus\022\017\n\007errorId\030\004 \001(\t\022\024\n\014error" +
- "Message\030\005 \001(\t*\310\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
+ "Message\030\005 \001(\t*\341\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000",
"\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY\020\003\022\020\n" +
- "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\016\n\n" +
- "QUERY_DATA\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n\022REQ_ME" +
- "TA_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_LIST\020\t\022\020" +
- "\n\014QUERY_RESULT\020\n*#\n\020QueryResultsMode\022\017\n\013" +
- "STREAM_FULL\020\001*^\n\017HandshakeStatus\022\013\n\007SUCC" +
- "ESS\020\001\022\030\n\024RPC_VERSION_MISMATCH\020\002\022\017\n\013AUTH_" +
- "FAILED\020\003\022\023\n\017UNKNOWN_FAILURE\020\004B+\n\033org.apa" +
- "che.drill.exec.protoB\nUserProtosH\001"
+ "\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020\005\022\027\n\023" +
+ "RESUME_PAUSED_QUERY\020\013\022\016\n\nQUERY_DATA\020\006\022\020\n" +
+ "\014QUERY_HANDLE\020\007\022\026\n\022REQ_META_FUNCTIONS\020\010\022" +
+ "\026\n\022RESP_FUNCTION_LIST\020\t\022\020\n\014QUERY_RESULT\020" +
+ "\n*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL\020\001*^" +
+ "\n\017HandshakeStatus\022\013\n\007SUCCESS\020\001\022\030\n\024RPC_VE" +
+ "RSION_MISMATCH\020\002\022\017\n\013AUTH_FAILED\020\003\022\023\n\017UNK" +
+ "NOWN_FAILURE\020\004B+\n\033org.apache.drill.exec." +
+ "protoB\nUserProtosH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
index 4d03073..6687a86 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java
@@ -28,6 +28,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
RUN_QUERY(3),
CANCEL_QUERY(4),
REQUEST_RESULTS(5),
+ RESUME_PAUSED_QUERY(11),
QUERY_DATA(6),
QUERY_HANDLE(7),
REQ_META_FUNCTIONS(8),
@@ -61,6 +62,7 @@ public enum RpcType implements com.dyuproject.protostuff.EnumLite<RpcType>
case 8: return REQ_META_FUNCTIONS;
case 9: return RESP_FUNCTION_LIST;
case 10: return QUERY_RESULT;
+ case 11: return RESUME_PAUSED_QUERY;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/BitControl.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto
index 93bc33c..c9295f0 100644
--- a/protocol/src/main/protobuf/BitControl.proto
+++ b/protocol/src/main/protobuf/BitControl.proto
@@ -14,25 +14,25 @@ enum RpcType {
HANDSHAKE = 0;
ACK = 1;
GOODBYE = 2;
-
+
// bit requests
- REQ_INIATILIZE_FRAGMENTS = 3; // Returns Handle
-
+ REQ_INITIALIZE_FRAGMENTS = 3; // Returns Handle
+
REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
REQ_RECEIVER_FINISHED = 7;
REQ_FRAGMENT_STATUS = 8; // send a fragment status, return Ack
REQ_BIT_STATUS = 9; // get bit status.
REQ_QUERY_STATUS = 10;
REQ_QUERY_CANCEL = 15;
-
- // bit responses
+ REQ_UNPAUSE_FRAGMENT = 16; // send a resume message for a fragment, returns Ack
+
+ // bit responses
RESP_FRAGMENT_HANDLE = 11;
RESP_FRAGMENT_STATUS = 12;
RESP_BIT_STATUS = 13;
RESP_QUERY_STATUS = 14;
}
-
message BitControlHandshake{
optional int32 rpc_version = 1;
optional exec.shared.RpcChannel channel = 2 [default = BIT_CONTROL];
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto b/protocol/src/main/protobuf/User.proto
index 185a646..ceed3d8 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -17,6 +17,7 @@ enum RpcType {
RUN_QUERY = 3; // user is submitting a new query to the drillbit
CANCEL_QUERY = 4; // user is sending a query cancellation request to the drillbit
REQUEST_RESULTS = 5;
+ RESUME_PAUSED_QUERY = 11; // user is sending a query resume request to the drillbit
// bit to user
QUERY_DATA = 6; // drillbit is sending a query result data batch to the user
[2/3] drill git commit: DRILL-2697: Pauses sites wait indefinitely
for a resume signal DrillClient sends a resume signal to UserServer.
UserServer triggers a resume call in the correct Foreman. Foreman resumes all
pauses related to the query through the
Posted by ve...@apache.org.
DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the Control layer.
+ Better error messages and more tests in TestDrillbitResilience and TestPauseInjection
+ Added execution controls to operator context
+ Removed ControlMessageHandler interface, renamed ControlHandlerImpl to ControlMessageHandler
+ Added CountDownLatchInjection, useful in cases like ParititionedSender that spawns multiple threads
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f8e5e613
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f8e5e613
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f8e5e613
Branch: refs/heads/master
Commit: f8e5e613ff001da79377caea9c8e453e5619499a
Parents: 4e59633
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Thu Apr 30 13:27:08 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun May 10 12:24:44 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/client/DrillClient.java | 10 +-
.../apache/drill/exec/ops/OperatorContext.java | 3 +
.../drill/exec/ops/OperatorContextImpl.java | 8 +
.../drill/exec/physical/impl/ScreenCreator.java | 6 +-
.../exec/rpc/control/ControlRpcConfig.java | 4 +-
.../drill/exec/rpc/control/ControlTunnel.java | 20 +-
.../drill/exec/rpc/user/UserRpcConfig.java | 3 +-
.../apache/drill/exec/rpc/user/UserServer.java | 9 +
.../drill/exec/store/pojo/PojoRecordReader.java | 16 +-
.../exec/testing/CountDownLatchInjection.java | 51 ++++
.../testing/CountDownLatchInjectionImpl.java | 85 ++++++
.../drill/exec/testing/ExecutionControls.java | 37 ++-
.../exec/testing/ExecutionControlsInjector.java | 13 +-
.../apache/drill/exec/testing/Injection.java | 6 +-
.../exec/testing/NoOpControlsInjector.java | 36 ++-
.../drill/exec/testing/PauseInjection.java | 33 +-
.../org/apache/drill/exec/work/WorkManager.java | 3 +-
.../exec/work/batch/ControlHandlerImpl.java | 197 ------------
.../exec/work/batch/ControlMessageHandler.java | 195 +++++++++++-
.../apache/drill/exec/work/foreman/Foreman.java | 25 +-
.../drill/exec/work/foreman/QueryManager.java | 44 ++-
.../exec/work/fragment/FragmentExecutor.java | 15 +-
.../exec/work/fragment/FragmentManager.java | 6 +
.../work/fragment/NonRootFragmentManager.java | 5 +
.../exec/work/fragment/RootFragmentManager.java | 9 +-
.../apache/drill/exec/work/user/UserWorker.java | 8 +
.../exec/server/TestDrillbitResilience.java | 303 +++++++++++--------
.../testing/TestCountDownLatchInjection.java | 155 ++++++++++
.../drill/exec/testing/TestPauseInjection.java | 146 ++++++++-
.../org/apache/drill/exec/proto/BitControl.java | 48 ++-
.../org/apache/drill/exec/proto/UserProtos.java | 46 ++-
.../apache/drill/exec/proto/beans/RpcType.java | 2 +
protocol/src/main/protobuf/BitControl.proto | 12 +-
protocol/src/main/protobuf/User.proto | 1 +
34 files changed, 1106 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 136d8c7..c642c4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -312,10 +312,18 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
- logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+ if(logger.isDebugEnabled()) {
+ logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+ }
return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
}
+ public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
+ }
+ return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
+ }
/**
* Submits a Logical plan for direct execution (bypasses parsing)
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 7cc52ba..35139d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
public abstract class OperatorContext {
@@ -36,6 +37,8 @@ public abstract class OperatorContext {
public abstract OperatorStats getStats();
+ public abstract ExecutionControls getExecutionControls();
+
public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator();
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 6dbd880..9fa8867 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -24,11 +24,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import org.apache.drill.exec.testing.ExecutionControls;
class OperatorContextImpl extends OperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
private final BufferAllocator allocator;
+ private final ExecutionControls executionControls;
private boolean closed = false;
private PhysicalOperator popConfig;
private OperatorStats stats;
@@ -42,6 +44,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
this.stats = context.getStats().getOperatorStats(def, allocator);
+ executionControls = context.getExecutionControls();
}
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
@@ -49,6 +52,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
this.popConfig = popConfig;
this.stats = stats;
+ executionControls = context.getExecutionControls();
}
public DrillBuf replace(DrillBuf old, int newSize) {
@@ -70,6 +74,10 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return newBuf;
}
+ public ExecutionControls getExecutionControls() {
+ return executionControls;
+ }
+
public BufferAllocator getAllocator() {
if (allocator == null) {
throw new UnsupportedOperationException("Operator context does not have an allocator");
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c31de66..76dc91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -52,7 +52,6 @@ public class ScreenCreator implements RootCreator<Screen>{
static class ScreenRoot extends BaseRootExec {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming;
private final FragmentContext context;
private final AccountingUserConnection userConnection;
@@ -136,6 +135,11 @@ public class ScreenCreator implements RootCreator<Screen>{
}
+ @Override
+ public void close() throws Exception {
+ injector.injectPause(context.getExecutionControls(), "send-complete", logger);
+ super.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index f92bb49..0cfa56e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -40,16 +40,18 @@ public class ControlRpcConfig {
.name("CONTROL")
.timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
.add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
- .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+ .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.build();
}
public static int RPC_VERSION = 3;
public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+ public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index a4f9fdf..16b9b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,12 +17,9 @@
*/
package org.apache.drill.exec.rpc.control;
-import java.util.Collection;
-
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -56,7 +53,12 @@ public class ControlTunnel {
}
public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
- CancelFragment b = new CancelFragment(outcomeListener, handle);
+ final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_CANCEL_FRAGMENT);
+ manager.runCommand(b);
+ }
+
+ public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
+ final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
manager.runCommand(b);
}
@@ -114,17 +116,19 @@ public class ControlTunnel {
}
}
- public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
+ public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
final FragmentHandle handle;
+ final RpcType type;
- public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+ public SignalFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle, RpcType type) {
super(listener);
this.handle = handle;
+ this.type = type;
}
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
}
}
@@ -139,7 +143,7 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class);
+ connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index ae728d8..3f8122d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -36,11 +36,12 @@ public class UserRpcConfig {
return RpcConfig.newBuilder()
.name("USER")
.timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
- .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+ .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
.add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
.add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
.add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+ .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index b3b7ae9..72b07ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -113,6 +113,15 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
throw new RpcException("Failure while decoding QueryId body.", e);
}
+ case RpcType.RESUME_PAUSED_QUERY_VALUE:
+ try {
+ final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ final Ack ack = worker.resumeQuery(queryId);
+ return new Response(RpcType.ACK, ack);
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding QueryId body.", e);
+ }
+
default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index cf98b83..a893da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -42,13 +42,16 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
public class PojoRecordReader<T> extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(PojoRecordReader.class);
public final int forJsonIgnore = 1;
@@ -64,16 +67,9 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
this.iterator = iterator;
}
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
-
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ operatorContext = context;
try {
Field[] fields = pojoClass.getDeclaredFields();
List<PojoWriter> writers = Lists.newArrayList();
@@ -147,7 +143,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
@Override
public int next() {
boolean allocated = false;
-
+ injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
try {
int i =0;
outside:
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
new file mode 100644
index 0000000..de4a181
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * This class is used internally for tracking injected countdown latches. These latches are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * This injection is useful in the case where a thread spawns multiple threads. The parent thread initializes the latch
+ * with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
+ * and same descriptor), and once there are enough, the parent thread continues.
+ */
+public interface CountDownLatchInjection {
+
+ /**
+ * Initializes the underlying latch
+ * @param count the number of times {@link #countDown} must be invoke before threads can pass through {@link #await}
+ */
+ void initialize(final int count);
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to zero, unless the thread is
+ * {@link Thread#interrupt interrupted}.
+ */
+ void await() throws InterruptedException;
+
+ /**
+ * Await without interruption. In the case of interruption, log a warning and continue to wait.
+ */
+ void awaitUninterruptibly();
+
+ /**
+ * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
+ */
+ void countDown();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
new file mode 100644
index 0000000..f4012c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
+ * {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
+ * provides more control than PauseInjection.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class CountDownLatchInjectionImpl extends Injection implements CountDownLatchInjection {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountDownLatchInjectionImpl.class);
+
+ private ExtendedLatch latch = null;
+
+ @JsonCreator // ensures instances are created only through JSON
+ private CountDownLatchInjectionImpl(@JsonProperty("address") final String address,
+ @JsonProperty("port") final int port,
+ @JsonProperty("siteClass") final String siteClass,
+ @JsonProperty("desc") final String desc) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, 0, 1);
+ }
+
+ @Override
+ protected boolean injectNow() {
+ return true;
+ }
+
+ @Override
+ public void initialize(final int count) {
+ Preconditions.checkArgument(latch == null, "Latch can be initialized only once at %s in %s.", desc,
+ siteClass.getSimpleName());
+ Preconditions.checkArgument(count > 0, "Count has to be a positive integer at %s in %s.", desc,
+ siteClass.getSimpleName());
+ latch = new ExtendedLatch(count);
+ }
+
+ @Override
+ public void await() throws InterruptedException {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ try {
+ latch.await();
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
+ throw e;
+ }
+ }
+
+ @Override
+ public void awaitUninterruptibly() {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ latch.awaitUninterruptibly();
+ }
+
+ @Override
+ public void countDown() {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
+ latch.countDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 1171bf8..639802f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -50,13 +50,15 @@ public final class ExecutionControls {
controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
}
- // Jackson MixIn for all types of injections
+ // Jackson MixIn: an annotated class that is used only by Jackson's ObjectMapper to allow a list of injections to
+ // hold various types of injections
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@Type(value = ExceptionInjection.class, name = "exception"),
+ @Type(value = CountDownLatchInjectionImpl.class, name = "latch"),
@Type(value = PauseInjection.class, name = "pause")})
public static abstract class InjectionMixIn {
}
@@ -99,7 +101,7 @@ public final class ExecutionControls {
final String jsonString = v.string_val;
try {
controlsOptionMapper.readValue(jsonString, Controls.class);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
}
}
@@ -137,7 +139,7 @@ public final class ExecutionControls {
final Controls controls;
try {
controls = controlsOptionMapper.readValue(opString, Controls.class);
- } catch (IOException e) {
+ } catch (final IOException e) {
// This never happens. opString must have been validated.
logger.warn("Could not parse injections. Injections must have been validated before this point.");
throw new DrillRuntimeException("Could not parse injections.", e);
@@ -153,7 +155,7 @@ public final class ExecutionControls {
}
/**
- * Look for an exception injection matching the given injector, site description and endpoint.
+ * Look for an exception injection matching the given injector, site descriptor, and endpoint.
*
* @param injector the injector, which indicates a class
* @param desc the injection site description
@@ -165,7 +167,7 @@ public final class ExecutionControls {
}
/**
- * Look for an pause injection matching the given injector, site description and endpoint.
+ * Look for an pause injection matching the given injector, site descriptor, and endpoint.
*
* @param injector the injector, which indicates a class
* @param desc the injection site description
@@ -176,6 +178,20 @@ public final class ExecutionControls {
return injection != null ? (PauseInjection) injection : null;
}
+ /**
+ * Look for a count down latch injection matching the given injector, site descriptor, and endpoint.
+ *
+ * @param injector the injector, which indicates a class
+ * @param desc the injection site description
+ * @return the count down latch injection, if there is one for the injector, site and endpoint;
+ * otherwise, a latch that does nothing
+ */
+ public CountDownLatchInjection lookupCountDownLatchInjection(final ExecutionControlsInjector injector,
+ final String desc) {
+ final Injection injection = lookupInjection(injector, desc);
+ return injection != null ? (CountDownLatchInjection) injection : NoOpControlsInjector.LATCH;
+ }
+
private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
if (controls.isEmpty()) {
return null;
@@ -190,4 +206,15 @@ public final class ExecutionControls {
// return only if injection was meant for this drillbit
return injection.isValidForBit(endpoint) ? injection : null;
}
+
+ /**
+ * This method resumes all pauses within the current context (QueryContext or FragmentContext).
+ */
+ public void unpauseAll() {
+ for (final Injection injection : controls.values()) {
+ if (injection instanceof PauseInjection) {
+ ((PauseInjection) injection).unpause();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 4b1cd0c..05f8433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -75,12 +75,11 @@ public class ExecutionControlsInjector {
* @param desc the site description
* throws the exception specified by the injection, if it is time
*/
- public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+ public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
if (exceptionInjection != null) {
exceptionInjection.throwUnchecked();
}
- return this;
}
/**
@@ -95,13 +94,12 @@ public class ExecutionControlsInjector {
* @param exceptionClass the expected class of the exception (or a super class of it)
* @throws T the exception specified by the injection, if it is time
*/
- public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ public <T extends Throwable> void injectChecked(
final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
if (exceptionInjection != null) {
exceptionInjection.throwChecked(exceptionClass);
}
- return this;
}
/**
@@ -114,7 +112,7 @@ public class ExecutionControlsInjector {
* @param desc the site description
* @param logger logger of the class containing the injection site
*/
- public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ public void injectPause(final ExecutionControls executionControls, final String desc,
final Logger logger) {
final PauseInjection pauseInjection =
executionControls.lookupPauseInjection(this, desc);
@@ -124,6 +122,9 @@ public class ExecutionControlsInjector {
pauseInjection.pause();
logger.debug("Resuming at {}", desc);
}
- return this;
+ }
+
+ public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+ return executionControls.lookupCountDownLatchInjection(this, desc);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index 96fed3a..08ade51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -28,8 +28,8 @@ public abstract class Injection {
protected final String address; // the address of the drillbit on which to inject
protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
- private final Class<?> siteClass; // the class where the injection should happen
- private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+ protected final Class<?> siteClass; // the class where the injection should happen
+ protected final String desc; // description of the injection site; useful for multiple exception injections in a single class
private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
private final AtomicInteger nFire; // the number of times to do the injection, after any skips; starts > 0
@@ -64,7 +64,7 @@ public abstract class Injection {
*
* @return if the injection should be injected now
*/
- protected final boolean injectNow() {
+ protected boolean injectNow() {
return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 80d9790..33ab783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.testing;
import org.slf4j.Logger;
/**
- * An injector that does not inject any controls.
+ * An injector that does not inject any controls, useful when not testing (i.e. assertions are not enabled).
*/
public final class NoOpControlsInjector extends ExecutionControlsInjector {
@@ -29,20 +29,42 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
}
@Override
- public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
- return this;
+ public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
}
@Override
- public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ public <T extends Throwable> void injectChecked(
final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
- return this;
}
@Override
- public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ public void injectPause(final ExecutionControls executionControls, final String desc,
final Logger logger) {
- return this;
}
+ /**
+ * When assertions are not enabled, this count down latch that does nothing is injected.
+ */
+ public static final CountDownLatchInjection LATCH = new CountDownLatchInjection() {
+ @Override
+ public void initialize(int count) {
+ }
+
+ @Override
+ public void await() {
+ }
+
+ @Override
+ public void awaitUninterruptibly() {
+ }
+
+ @Override
+ public void countDown() {
+ }
+ };
+
+ @Override
+ public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+ return LATCH;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index e5f9c9c..ff0340b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -21,43 +21,40 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.DrillRuntimeException;
/**
- * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
- * injected pauses; these pauses are specified via
+ * Injection for a single pause. Pause indefinitely until signalled. This class is used internally for tracking
+ * injected pauses. Note that pauses can be fired only once; nFire field is ignored. These pauses are specified via
* {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
*
- * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ * After the pauses are set, the user sends another signal to unpause all the pauses. This triggers the Foreman to
+ * 1) unpause all pauses in QueryContext, and
+ * 2) send an unpause signal to all fragments, each of which unpauses all pauses in FragmentContext.
*/
@JsonAutoDetect(fieldVisibility = Visibility.ANY)
public class PauseInjection extends Injection {
- private final long millis;
+ private final ExtendedLatch latch = new ExtendedLatch(1);
@JsonCreator // ensures instances are created only through JSON
private PauseInjection(@JsonProperty("address") final String address,
@JsonProperty("port") final int port,
@JsonProperty("siteClass") final String siteClass,
@JsonProperty("desc") final String desc,
- @JsonProperty("nSkip") final int nSkip,
- @JsonProperty("nFire") final int nFire,
- @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
- super(address, port, siteClass, desc, nSkip, nFire);
- if (millis <= 0) {
- throw new InjectionConfigurationException("Pause millis is non-positive.");
- }
- this.millis = millis;
+ @JsonProperty("nSkip") final int nSkip) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, nSkip, 1);
}
public void pause() {
- if (! injectNow()) {
+ if (!injectNow()) {
return;
}
- try {
- Thread.sleep(millis);
- } catch (InterruptedException e) {
- throw new DrillRuntimeException("Well, I should be sleeping.");
- }
+ latch.awaitUninterruptibly();
+ }
+
+ public void unpause() {
+ latch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 3e4f3d1..f2352e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.QueryManager;
@@ -116,7 +115,7 @@ public class WorkManager implements AutoCloseable {
// TODO references to this escape here (via WorkerBee) before construction is done
- controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
+ controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
statusThread = new StatusThread();
dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
deleted file mode 100644
index b6c6852..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-import static org.apache.drill.exec.rpc.RpcBus.get;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConstants;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.UserRpcException;
-import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-
-public class ControlHandlerImpl implements ControlMessageHandler {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
- private final WorkerBee bee;
-
- public ControlHandlerImpl(final WorkerBee bee) {
- this.bee = bee;
- }
-
- @Override
- public Response handle(final ControlConnection connection, final int rpcType,
- final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Received bit com message of type {}", rpcType);
- }
-
- switch (rpcType) {
-
- case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
- final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
- cancelFragment(handle);
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
- final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
- receivingFragmentFinished(finishedReceiver);
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
- // TODO: Support a type of message that has no response.
- return DataRpcConfig.OK;
-
- case RpcType.REQ_QUERY_CANCEL_VALUE: {
- final QueryId queryId = get(pBody, QueryId.PARSER);
- final Foreman foreman = bee.getForemanForQueryId(queryId);
- if (foreman != null) {
- foreman.cancel();
- return DataRpcConfig.OK;
- } else {
- return DataRpcConfig.FAIL;
- }
- }
-
- case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
- final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
- for(int i = 0; i < fragments.getFragmentCount(); i++) {
- startNewRemoteFragment(fragments.getFragment(i));
- }
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_QUERY_STATUS_VALUE: {
- final QueryId queryId = get(pBody, QueryId.PARSER);
- final Foreman foreman = bee.getForemanForQueryId(queryId);
- if (foreman == null) {
- throw new RpcException("Query not running on node.");
- }
- final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
- return new Response(RpcType.RESP_QUERY_STATUS, profile);
- }
-
- default:
- throw new RpcException("Not yet supported.");
- }
- }
-
- @Override
- public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
- logger.debug("Received remote fragment start instruction", fragment);
-
- final DrillbitContext drillbitContext = bee.getContext();
- try {
- // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
- if (fragment.getLeafFragment()) {
- final FragmentContext context = new FragmentContext(drillbitContext, fragment,
- drillbitContext.getFunctionImplementationRegistry());
- final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
- final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
- fragment.getFragmentJson());
- final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
- bee.addFragmentRunner(fr);
- } else {
- // isIntermediate, store for incoming data.
- final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
- drillbitContext.getWorkBus().addFragmentManager(manager);
- }
-
- } catch (final Exception e) {
- throw new UserRpcException(drillbitContext.getEndpoint(),
- "Failure while trying to start remote fragment", e);
- } catch (final OutOfMemoryError t) {
- if (t.getMessage().startsWith("Direct buffer")) {
- throw new UserRpcException(drillbitContext.getEndpoint(),
- "Out of direct memory while trying to start remote fragment", t);
- } else {
- throw t;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
- */
- @Override
- public Ack cancelFragment(final FragmentHandle handle) {
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
- if (manager != null) {
- // try remote fragment cancel.
- manager.cancel();
- } else {
- // then try local cancel.
- final FragmentExecutor runner = bee.getFragmentRunner(handle);
- if (runner != null) {
- runner.cancel();
- }
- }
-
- return Acks.OK;
- }
-
- private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
-
- final FragmentManager manager =
- bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
-
- FragmentExecutor executor;
- if (manager != null) {
- manager.receivingFragmentFinished(finishedReceiver.getReceiver());
- } else {
- // then try local cancel.
- executor = bee.getFragmentRunner(finishedReceiver.getSender());
- if (executor != null) {
- executor.receivingFragmentFinished(finishedReceiver.getReceiver());
- } else {
- logger.warn(
- "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
- QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
- QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
- }
- }
-
- return Acks.OK;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index c5d78cc..d12e6d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,24 +17,207 @@
*/
package org.apache.drill.exec.work.batch;
+import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
+import org.apache.drill.exec.rpc.control.ControlRpcConfig;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-public interface ControlMessageHandler {
+public class ControlMessageHandler {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
+ private final WorkerBee bee;
- public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
- throws RpcException;
+ public ControlMessageHandler(final WorkerBee bee) {
+ this.bee = bee;
+ }
- public abstract void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException;
+ public Response handle(final ControlConnection connection, final int rpcType,
+ final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Received bit com message of type {}", rpcType);
+ }
- public abstract Ack cancelFragment(FragmentHandle handle);
+ switch (rpcType) {
+ case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ cancelFragment(handle);
+ return ControlRpcConfig.OK;
+ }
-}
\ No newline at end of file
+ case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+ final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+ receivingFragmentFinished(finishedReceiver);
+ return ControlRpcConfig.OK;
+ }
+
+ case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+ bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
+ // TODO: Support a type of message that has no response.
+ return ControlRpcConfig.OK;
+
+ case RpcType.REQ_QUERY_CANCEL_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.cancel();
+ return ControlRpcConfig.OK;
+ } else {
+ return ControlRpcConfig.FAIL;
+ }
+ }
+
+ case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+ final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+ for(int i = 0; i < fragments.getFragmentCount(); i++) {
+ startNewRemoteFragment(fragments.getFragment(i));
+ }
+ return ControlRpcConfig.OK;
+ }
+
+ case RpcType.REQ_QUERY_STATUS_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman == null) {
+ throw new RpcException("Query not running on node.");
+ }
+ final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+ return new Response(RpcType.RESP_QUERY_STATUS, profile);
+ }
+
+ case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ resumeFragment(handle);
+ return ControlRpcConfig.OK;
+ }
+
+ default:
+ throw new RpcException("Not yet supported.");
+ }
+ }
+
+ private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
+ logger.debug("Received remote fragment start instruction", fragment);
+
+ final DrillbitContext drillbitContext = bee.getContext();
+ try {
+ // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+ if (fragment.getLeafFragment()) {
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment,
+ drillbitContext.getFunctionImplementationRegistry());
+ final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+ final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+ final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+ fragment.getFragmentJson());
+ final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+ bee.addFragmentRunner(fr);
+ } else {
+ // isIntermediate, store for incoming data.
+ final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+ drillbitContext.getWorkBus().addFragmentManager(manager);
+ }
+
+ } catch (final Exception e) {
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Failure while trying to start remote fragment", e);
+ } catch (final OutOfMemoryError t) {
+ if (t.getMessage().startsWith("Direct buffer")) {
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Out of direct memory while trying to start remote fragment", t);
+ } else {
+ throw t;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+ */
+ private Ack cancelFragment(final FragmentHandle handle) {
+ // cancel a pending fragment
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ if (manager != null) {
+ manager.cancel();
+ return Acks.OK;
+ }
+
+ // cancel a running fragment
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
+ if (runner != null) {
+ runner.cancel();
+ return Acks.OK;
+ }
+
+ // fragment completed or does not exist
+ logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+ return Acks.OK;
+ }
+
+ private Ack resumeFragment(final FragmentHandle handle) {
+ // resume a pending fragment
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ if (manager != null) {
+ manager.unpause();
+ return Acks.OK;
+ }
+
+ // resume a paused fragment
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
+ if (runner != null) {
+ runner.unpause();
+ return Acks.OK;
+ }
+
+ // fragment completed or does not exist
+ logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+ return Acks.OK;
+ }
+
+ private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
+ final FragmentManager manager =
+ bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+
+ FragmentExecutor executor;
+ if (manager != null) {
+ manager.receivingFragmentFinished(finishedReceiver.getReceiver());
+ } else {
+ executor = bee.getFragmentRunner(finishedReceiver.getSender());
+ if (executor != null) {
+ executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+ } else {
+ logger.warn(
+ "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+ QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+ QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+ }
+ }
+
+ return Acks.OK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4d403b8..0122ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -119,6 +119,7 @@ public class Foreman implements Runnable {
private final DrillbitContext drillbitContext;
private final UserClientConnection initiatingClient; // used to send responses
private volatile QueryState state;
+ private boolean resume = false;
private volatile DistributedLease lease; // used to limit the number of concurrent queries
@@ -196,6 +197,18 @@ public class Foreman implements Runnable {
}
/**
+ * Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
+ * This method can be called multiple times.
+ */
+ public void resume() {
+ resume = true;
+ // resume all pauses through query context
+ queryContext.getExecutionControls().unpauseAll();
+ // resume all pauses through all fragment contexts
+ queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+ }
+
+ /**
* Called by execution pool to do query setup, and kick off remote execution.
*
* <p>Note that completion of this function is not the end of the Foreman's role
@@ -268,9 +281,20 @@ public class Foreman implements Runnable {
* If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
* make sure that we can't make things any worse as those events are delivered, but allow
* any necessary remaining cleanup to proceed.
+ *
+ * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+ * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+ * to accept events.
*/
acceptExternalEvents.countDown();
+ // If we received the resume signal before fragments are setup, the first call does not actually resume the
+ // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+ if(resume) {
+ resume();
+ }
+ injector.injectPause(queryContext.getExecutionControls(), "foreman-ready", logger);
+
// restore the thread's original name
currentThread.setName(originalName);
}
@@ -375,7 +399,6 @@ public class Foreman implements Runnable {
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
logger.debug("Submitting fragments to run.");
- injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
// set up the root fragment first so we'll have incoming buffers available.
setupRootFragment(rootPlanFragment, work.getRootOperator());
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index c4646bd..090a377 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -189,7 +189,8 @@ public class QueryManager {
final DrillbitEndpoint endpoint = data.getEndpoint();
final FragmentHandle handle = data.getHandle();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
- controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+ controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+ SignalListener.Signal.CANCEL), handle);
}
break;
@@ -203,25 +204,52 @@ public class QueryManager {
}
}
+ /**
+ * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
+ * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
+ * control tunnel.
+ */
+ void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ if (rootRunner != null) {
+ rootRunner.unpause();
+ }
+ final Controller controller = drillbitContext.getController();
+ for(final FragmentData data : fragmentDataSet) {
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ final FragmentHandle handle = data.getHandle();
+ controller.getTunnel(endpoint).resumeFragment(new SignalListener(endpoint, handle,
+ SignalListener.Signal.UNPAUSE), handle);
+ }
+ }
+
/*
* This assumes that the FragmentStatusListener implementation takes action when it hears
- * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+ * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
* but log messages.
*/
- private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
- public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+ private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
+ /**
+ * An enum of possible signals that {@link SignalListener} listens to.
+ */
+ public static enum Signal { CANCEL, UNPAUSE }
+
+ private final Signal signal;
+
+ public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
super(endpoint, handle);
+ this.signal = signal;
}
@Override
public void failed(final RpcException ex) {
- logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+ logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
}
@Override
- public void success(final Ack value, final ByteBuf buf) {
- if (!value.getOk()) {
- logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+ public void success(final Ack ack, final ByteBuf buf) {
+ if (!ack.getOk()) {
+ logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
+ ack);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7baafc4..24e2556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -108,7 +108,7 @@ public class FragmentExecutor implements Runnable {
/**
* Cancel the execution of this fragment is in an appropriate state. Messages come from external.
- * NOTE that this can be called from threads *other* than the one running this runnable(),
+ * NOTE that this will be called from threads *other* than the one running this runnable(),
* so we need to be careful about the state transitions that can result.
*/
public void cancel() {
@@ -140,11 +140,18 @@ public class FragmentExecutor implements Runnable {
}
/**
+ * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
+ * the one running this runnable(). Also, this method can be called multiple times.
+ */
+ public synchronized void unpause() {
+ fragmentContext.getExecutionControls().unpauseAll();
+ }
+
+ /**
* Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
* called in the case that a limit query is executed.
*
- * @param handle
- * The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+ * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
*/
public void receivingFragmentFinished(final FragmentHandle handle) {
acceptExternalEvents.awaitUninterruptibly();
@@ -277,6 +284,8 @@ public class FragmentExecutor implements Runnable {
// first close the operators and release all memory.
try {
+ // Say executor was cancelled before setup. Now when executor actually runs, root is not initialized, but this
+ // method is called in finally. So root can be null.
if (root != null) {
root.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 0ba91b4..ad880da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -49,6 +49,12 @@ public interface FragmentManager {
public abstract void cancel();
+ /**
+ * If the executor is paused (for testing), this method should unpause the executor. This method should handle
+ * multiple calls.
+ */
+ public abstract void unpause();
+
public boolean isWaiting();
public abstract FragmentHandle getHandle();
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index f526fbe..ca5d5b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -112,6 +112,11 @@ public class NonRootFragmentManager implements FragmentManager {
}
@Override
+ public void unpause() {
+ runner.unpause();
+ }
+
+ @Override
public FragmentHandle getHandle() {
return fragment.getHandle();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b1c3fe0..67ef9b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -29,8 +29,8 @@ import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.work.batch.IncomingBuffers;
// TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+public class RootFragmentManager implements FragmentManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
@@ -70,6 +70,11 @@ public class RootFragmentManager implements FragmentManager{
}
@Override
+ public void unpause() {
+ runner.unpause();
+ }
+
+ @Override
public boolean isWaiting() {
return !buffers.isDone() && !cancel;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f8e5e613/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 8854ef3..e8deb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -68,6 +68,14 @@ public class UserWorker{
return Acks.OK;
}
+ public Ack resumeQuery(final QueryId queryId) {
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.resume();
+ }
+ return Acks.OK;
+ }
+
public OptionManager getSystemOptions() {
return bee.getContext().getOptionManager();
}
[3/3] drill git commit: DRILL-2755: Use and handle
InterruptedException during query processing.
Posted by ve...@apache.org.
DRILL-2755: Use and handle InterruptedException during query processing.
- Interrupt FragmentExecutor thread as part of FragmentExecutor.cancel()
- Handle InterruptedException in ExternalSortBatch.newSV2(). If the fragment status says
should not continue, then throw the InterruptedException to caller which returns IterOutcome.STOP
- Add comments reg not handling of InterruptedException in SendingAccountor.waitForSendComplete()
- Handle InterruptedException in OrderedPartitionRecordBatch.getPartitionVectors()
If interrupted in Thread.sleep calls and fragment status says should not run, then
return IterOutcome.STOP downstream.
- Interrupt partitioner threads if PartitionerRecordBatch is interrupted while waiting for
partitioner threads to complete.
- Preserve interrupt status if not handled
- Handle null RecordBatches returned by RawBatchBuffer.getNext() in MergingRecordBatch.buildSchema()
- Change timeout in Foreman to be proportional to the number of intermediate fragments sent instead
of hard coded limit of 90s.
- Change TimedRunnable to enforce a timeout of 15s per runnable.
Total timeout is (5s * numOfRunnableTasks) / parallelism.
- Add unit tests
* Testing cancelling a query interrupts the query fragments which are currently blocked
* Testing interrupting the partitioner sender which in turn interrupts its helper threads
* Testing TimedRunanble enforeces timeout for the whole task list.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3a294abc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3a294abc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3a294abc
Branch: refs/heads/master
Commit: 3a294abcc51148e0e79096af5e6d3c45b7c19a9d
Parents: f8e5e61
Author: vkorukanti <ve...@gmail.com>
Authored: Wed May 6 09:34:25 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Sun May 10 12:24:45 2015 -0700
----------------------------------------------------------------------
.../drill/common/concurrent/ExtendedLatch.java | 21 +---
.../apache/drill/exec/ops/SendingAccountor.java | 13 +++
.../impl/mergereceiver/MergingRecordBatch.java | 19 +++
.../OrderedPartitionRecordBatch.java | 31 ++++-
.../partitionsender/PartitionerDecorator.java | 70 +++++++++--
.../partitionsender/PartitionerTemplate.java | 3 +-
.../UnorderedReceiverBatch.java | 22 +++-
.../physical/impl/xsort/ExternalSortBatch.java | 10 +-
.../exec/record/RawFragmentBatchProvider.java | 2 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 9 +-
.../org/apache/drill/exec/rpc/BasicServer.java | 7 +-
.../drill/exec/rpc/ReconnectingConnection.java | 7 +-
.../apache/drill/exec/rpc/RemoteConnection.java | 18 ++-
.../apache/drill/exec/rpc/data/DataTunnel.java | 14 ++-
.../org/apache/drill/exec/server/Drillbit.java | 4 +
.../apache/drill/exec/store/TimedRunnable.java | 52 +++++++--
.../exec/testing/CountDownLatchInjection.java | 5 +
.../testing/CountDownLatchInjectionImpl.java | 5 +
.../exec/testing/ExecutionControlsInjector.java | 28 +++++
.../exec/testing/NoOpControlsInjector.java | 4 +
.../drill/exec/testing/PauseInjection.java | 7 ++
.../org/apache/drill/exec/work/WorkManager.java | 24 ++--
.../exec/work/batch/SpoolingRawBatchBuffer.java | 9 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 8 +-
.../apache/drill/exec/work/foreman/Foreman.java | 14 ++-
.../exec/work/fragment/FragmentExecutor.java | 14 +++
.../exec/server/TestDrillbitResilience.java | 117 ++++++++++++++++++-
.../drill/exec/store/TestTimedRunnable.java | 103 ++++++++++++++++
.../drill/jdbc/impl/DrillResultSetImpl.java | 4 +
29 files changed, 546 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
index a75ac32..3e14f8a 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/ExtendedLatch.java
@@ -35,16 +35,6 @@ public class ExtendedLatch extends CountDownLatch {
}
/**
- * Returns whether or not interruptions should continue to be ignored. This can be overridden in subclasses to check a
- * state variable or similar.
- *
- * @return Whether awaitUninterruptibly() should continue ignoring interruptions.
- */
- protected boolean ignoreInterruptions() {
- return true;
- }
-
- /**
* Await without interruption for a given time.
* @param waitMillis
* Time in milliseconds to wait
@@ -68,8 +58,7 @@ public class ExtendedLatch extends CountDownLatch {
}
/**
- * Await without interruption. In the case of interruption, log a warning and continue to wait. This also checks the
- * output of ignoreInterruptions();
+ * Await without interruption. In the case of interruption, log a warning and continue to wait.
*/
public void awaitUninterruptibly() {
while (true) {
@@ -77,12 +66,8 @@ public class ExtendedLatch extends CountDownLatch {
await();
return;
} catch (final InterruptedException e) {
- if (ignoreInterruptions()) {
- // if we're still not ready, the while loop will cause us to wait again
- logger.warn("Interrupted while waiting for event latch.", e);
- } else {
- return;
- }
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting for event latch.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index 0cb5fbf..5db2cc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -42,13 +42,26 @@ class SendingAccountor {
public synchronized void waitForSendComplete() {
int waitForBatches = batchesSent.get();
+ boolean isInterrupted = false;
while(waitForBatches != 0) {
try {
wait.acquire(waitForBatches);
waitForBatches = batchesSent.addAndGet(-1 * waitForBatches);
} catch (InterruptedException e) {
+ // We should always wait for send complete. If we don't, we'll leak memory or have a memory miss when we try
+ // to send. This should be safe because: network connections should get disconnected and fail a send if a
+ // node goes down, otherwise, the receiving side connection should always consume from the rpc layer
+ // (blocking is cooperative and will get triggered before this)
logger.warn("Interrupted while waiting for send complete. Continuing to wait.", e);
+
+ isInterrupted = true;
}
}
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f19f371..5d990f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -70,6 +70,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
@@ -88,6 +89,7 @@ import com.sun.codemodel.JExpr;
*/
public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(MergingRecordBatch.class);
private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
@@ -141,6 +143,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try {
+ injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
final RawFragmentBatch b = provider.getNext();
if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
@@ -148,6 +151,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
inputCounts[providerIndex] += b.getHeader().getDef().getRecordCount();
}
return b;
+ } catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
} finally {
stats.stopWait();
}
@@ -359,6 +368,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(node.batchId);
}
+
assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null && !context.shouldContinue()) {
@@ -461,6 +471,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
return;
}
final RawFragmentBatch batch = getNext(i);
+ if (batch == null) {
+ if (!context.shouldContinue()) {
+ state = BatchState.STOP;
+ } else {
+ state = BatchState.DONE;
+ }
+
+ break;
+ }
if (batch.getHeader().getDef().getFieldCount() == 0) {
i++;
continue;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 63b7eba..ca6d83c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -246,6 +246,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
/**
+ * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
+ * @param timeout Timeout in milliseconds.
+ * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+ */
+ private boolean waitUntilTimeOut(final long timeout) {
+ while(true) {
+ try {
+ Thread.sleep(timeout);
+ return true;
+ } catch (final InterruptedException e) {
+ if (!context.shouldContinue()) {
+ return false;
+ }
+ }
+ }
+ }
+
+ /**
* This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
* that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
* distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
@@ -255,10 +273,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @return True is successful. False if failed.
*/
private boolean getPartitionVectors() {
-
-
try {
-
if (!saveSamples()) {
return false;
}
@@ -279,14 +294,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// TODO: this should be polling.
if (val < fragmentsBeforeProceed) {
- Thread.sleep(10);
+ if (!waitUntilTimeOut(10)) {
+ return false;
+ }
}
for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey);
if (finalTable != null) {
break;
}
- Thread.sleep(10);
+ if (!waitUntilTimeOut(10)) {
+ return false;
+ }
}
if (finalTable == null) {
buildTable();
@@ -302,7 +321,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
partitionVectors.add(w.getValueVector());
}
- } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
+ } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
kill(false);
context.fail(ex);
return false;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index c3261dc..c355070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -28,6 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.testing.CountDownLatchInjection;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
/**
* Decorator class to hide multiple Partitioner existence from the caller
@@ -38,19 +41,22 @@ import com.google.common.collect.Lists;
* totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/
public class PartitionerDecorator {
-
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(PartitionerDecorator.class);
private List<Partitioner> partitioners;
private final OperatorStats stats;
private final String tName;
private final String childThreadPrefix;
private final ExecutorService executor;
+ private final FragmentContext context;
public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
this.partitioners = partitioners;
this.stats = stats;
+ this.context = context;
this.executor = context.getDrillbitContext().getExecutor();
this.tName = Thread.currentThread().getName();
this.childThreadPrefix = "Partitioner-" + tName + "-";
@@ -145,17 +151,42 @@ public class PartitionerDecorator {
stats.startWait();
final CountDownLatch latch = new CountDownLatch(partitioners.size());
final List<CustomRunnable> runnables = Lists.newArrayList();
+ final List<Future> taskFutures = Lists.newArrayList();
+ CountDownLatchInjection testCountDownLatch = null;
try {
- int i = 0;
- for (final Partitioner part : partitioners ) {
- runnables.add(new CustomRunnable(childThreadPrefix, latch, iface, part));
- executor.submit(runnables.get(i++));
+ // To simulate interruption of main fragment thread and interrupting the partition threads, create a
+ // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
+ // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
+ testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
+ testCountDownLatch.initialize(1);
+ for (final Partitioner part : partitioners) {
+ final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
+ runnables.add(runnable);
+ taskFutures.add(executor.submit(runnable));
}
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ while (true) {
+ try {
+ // Wait for main fragment interruption.
+ injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
+
+ // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
+ injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();
+
+ latch.await();
+ break;
+ } catch (final InterruptedException e) {
+ // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
+ if (!context.shouldContinue()) {
+ for(Future f : taskFutures) {
+ f.cancel(true);
+ }
+
+ break;
+ }
+ }
}
+
IOException excep = null;
for (final CustomRunnable runnable : runnables ) {
IOException myException = runnable.getException();
@@ -180,8 +211,12 @@ public class PartitionerDecorator {
// scale down main stats wait time based on calculated processing time
// since we did not wait for whole duration of above execution
stats.adjustWaitNanos(-maxProcessTime);
- }
+ // Done with the latch, close it.
+ if (testCountDownLatch != null) {
+ testCountDownLatch.close();
+ }
+ }
}
/**
@@ -190,7 +225,7 @@ public class PartitionerDecorator {
* protected is for testing purposes
*/
protected interface GeneralExecuteIface {
- public void execute(Partitioner partitioner) throws IOException;
+ void execute(Partitioner partitioner) throws IOException;
}
/**
@@ -242,17 +277,28 @@ public class PartitionerDecorator {
private final CountDownLatch latch;
private final GeneralExecuteIface iface;
private final Partitioner part;
+ private CountDownLatchInjection testCountDownLatch;
+
private volatile IOException exp;
- public CustomRunnable(String parentThreadName, CountDownLatch latch, GeneralExecuteIface iface, Partitioner part) {
+ public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
+ final Partitioner part, CountDownLatchInjection testCountDownLatch) {
this.parentThreadName = parentThreadName;
this.latch = latch;
this.iface = iface;
this.part = part;
+ this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
+ // Test only - Pause until interrupted by fragment thread
+ try {
+ testCountDownLatch.await();
+ } catch (final InterruptedException e) {
+ logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
+ }
+
final Thread currThread = Thread.currentThread();
final String currThreadName = currThread.getName();
final OperatorStats localStats = part.getStats();
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index cbea267..aeac01d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -285,7 +285,8 @@ public abstract class PartitionerTemplate implements Partitioner {
// to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
// sender has acknowledged the terminate request. After sending the last batch, all further batches are
// dropped.
- final boolean isLastBatch = isLast || terminated;
+ // 3. Partitioner thread is interrupted due to cancellation of fragment.
+ final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();
// if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
if (!isLastBatch && recordCount == 0) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 66a2092..e40fe54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -49,9 +49,12 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class UnorderedReceiverBatch implements CloseableRecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+ private final static ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(UnorderedReceiverBatch.class);
private final RecordBatchLoader batchLoader;
private final RawFragmentBatchProvider fragProvider;
@@ -133,6 +136,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
return batchLoader.getValueAccessorById(clazz, ids);
}
+ private RawFragmentBatch getNextBatch() throws IOException {
+ try {
+ injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
+ return fragProvider.getNext();
+ } catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
+ }
+ }
+
@Override
public IterOutcome next() {
stats.startProcessing();
@@ -140,11 +156,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
RawFragmentBatch batch;
try {
stats.startWait();
- batch = fragProvider.getNext();
+ batch = getNextBatch();
// skip over empty batches. we do this since these are basically control messages.
while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
- batch = fragProvider.getNext();
+ batch = getNextBatch();
}
} finally {
stats.stopWait();
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index aab3391..3159811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -277,6 +277,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} else {
try {
sv2 = newSV2();
+ } catch(InterruptedException e) {
+ return IterOutcome.STOP;
} catch (OutOfMemoryException e) {
throw new OutOfMemoryRuntimeException(e);
}
@@ -496,7 +498,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return size;
}
- private SelectionVector2 newSV2() throws OutOfMemoryException {
+ private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
if (!sv2.allocateNew(incoming.getRecordCount())) {
try {
@@ -509,8 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
while (true) {
try {
Thread.sleep(waitTime * 1000);
- } catch (InterruptedException e) {
- throw new OutOfMemoryException(e);
+ } catch(final InterruptedException e) {
+ if (!context.shouldContinue()) {
+ throw e;
+ }
}
waitTime *= 2;
if (sv2.allocateNew(incoming.getRecordCount())) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index d4dfe96..030785c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.ops.FragmentContext;
public interface RawFragmentBatchProvider {
- public RawFragmentBatch getNext() throws IOException;
+ public RawFragmentBatch getNext() throws IOException, InterruptedException;
public void kill(FragmentContext context);
public void cleanup();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 1661f81..d551173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -263,9 +263,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
logger.debug("Closing client");
try {
connection.getChannel().close().get();
- } catch (InterruptedException | ExecutionException e) {
- logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
- // TODO InterruptedException
+ } catch (final InterruptedException | ExecutionException e) {
+ logger.warn("Failure while shutting {}", this.getClass().getName(), e);
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a148436..6a7bc65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -204,9 +204,12 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
public void close() throws IOException {
try {
eventLoopGroup.shutdownGracefully().get();
- } catch (InterruptedException | ExecutionException e) {
+ } catch (final InterruptedException | ExecutionException e) {
logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index 9948d3e..f0787a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -112,9 +112,12 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
cmd.connectionSucceeded(connection);
// logger.debug("Finished connection succeeded activity.");
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
cmd.connectionFailed(FailureType.CONNECTION, e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0f095c0..2ee9263 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -70,9 +70,13 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
try{
writeManager.waitForWritable();
return true;
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
listener.failed(new RpcException(e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
return false;
}
}
@@ -131,10 +135,14 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
if (channel.isActive()) {
channel.close().get();
}
- } catch (InterruptedException | ExecutionException e) {
+ channel.close().get();
+ } catch (final InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 11f5496..ed31bed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -48,9 +48,12 @@ public class DataTunnel {
try{
sendingSemaphore.acquire();
manager.runCommand(b);
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
@@ -59,9 +62,12 @@ public class DataTunnel {
try{
sendingSemaphore.acquire();
manager.runCommand(b);
- }catch(InterruptedException e){
+ }catch(final InterruptedException e){
b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
- // TODO InterruptedException
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
return b.getFuture();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index e7a9a3c..531253e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -271,6 +271,10 @@ public class Drillbit implements AutoCloseable {
Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
} catch (final InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
if (embeddedJetty != null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
index 0fb778b..e52820b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.store;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.UserException;
import org.slf4j.Logger;
import com.google.common.base.Stopwatch;
@@ -36,6 +41,8 @@ import com.google.common.collect.Lists;
*/
public abstract class TimedRunnable<V> implements Runnable {
+ private static int TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
+
private volatile Exception e;
private volatile long timeNanos;
private volatile V value;
@@ -91,10 +98,13 @@ public abstract class TimedRunnable<V> implements Runnable {
}
/**
- * Execute the list of runnables with the given parallelization. At end, return values and report completion time stats to provided logger.
+ * Execute the list of runnables with the given parallelization. At end, return values and report completion time
+ * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
+ * tasks will be cancelled and a {@link UserException} is thrown.
* @param activity Name of activity for reporting in logger.
* @param logger The logger to use to report results.
- * @param runnables List of runnables that should be executed and timed. If this list has one item, task will be completed in-thread.
+ * @param runnables List of runnables that should be executed and timed. If this list has one item, task will be
+ * completed in-thread. Runnable must handle {@link InterruptedException}s.
* @param parallelism The number of threads that should be run to complete this task.
* @return The list of outcome objects.
* @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
@@ -107,25 +117,43 @@ public abstract class TimedRunnable<V> implements Runnable {
runnables.get(0).run();
}else{
parallelism = Math.min(parallelism, runnables.size());
- final CountDownLatch latch = new CountDownLatch(runnables.size());
+ final ExtendedLatch latch = new ExtendedLatch(runnables.size());
final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
try{
for(TimedRunnable<V> runnable : runnables){
threadPool.submit(new LatchedRunnable(latch, runnable));
}
- }finally{
- threadPool.shutdown();
- }
- try{
- latch.await();
- }catch(InterruptedException e){
- // TODO interrupted exception.
- throw new RuntimeException(e);
+ final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
+ if (!latch.awaitUninterruptibly(timeout)) {
+ // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
+ // It is highly important that the task Runnables are handling interrupts correctly.
+ threadPool.shutdownNow();
+
+ try {
+ // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+ // any running threads. If the runnables are handling the interrupts properly they should be able to
+ // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+ // thread leaks.
+ threadPool.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
+ }
+
+ final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
+ "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
+ logger.error(errMsg);
+ throw UserException.resourceError()
+ .message(errMsg)
+ .build();
+ }
+ } finally {
+ if (!threadPool.isShutdown()) {
+ threadPool.shutdown();
+ }
}
}
-
List<V> values = Lists.newArrayList();
long sum = 0;
long max = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index de4a181..d26e2bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -48,4 +48,9 @@ public interface CountDownLatchInjection {
* Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
*/
void countDown();
+
+ /**
+ * Close the latch.
+ */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index f4012c1..561d816 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -82,4 +82,9 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
latch.countDown();
}
+
+ @Override
+ public void close() {
+ latch = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 05f8433..387d300 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -124,6 +124,34 @@ public class ExecutionControlsInjector {
}
}
+ /**
+ * Insert a pause that can be interrupted using {@link Thread#interrupt()} at the given site point, if such an
+ * injection is specified (i.e. matches the site description).
+ * <p/>
+ * <p>Implementors use this in their code at a site where they want to simulate a interruptible pause
+ * during testing.
+ *
+ * @param executionControls the controls in the current context
+ * @param desc the site description
+ * @param logger logger of the class containing the injection site
+ * @throws InterruptedException if interrupted using {@link Thread#interrupt()}
+ */
+ public void injectInterruptiblePause(final ExecutionControls executionControls, final String desc,
+ final Logger logger) throws InterruptedException {
+ final PauseInjection pauseInjection = executionControls.lookupPauseInjection(this, desc);
+
+ if (pauseInjection != null) {
+ logger.debug("Interruptible pausing at {}", desc);
+ try {
+ pauseInjection.interruptiblePause();
+ } catch (final InterruptedException e) {
+ logger.debug("Pause interrupted at {}", desc);
+ throw e;
+ }
+ logger.debug("Interruptible pause resuming at {}", desc);
+ }
+ }
+
public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
return executionControls.lookupCountDownLatchInjection(this, desc);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 33ab783..bb13d1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -61,6 +61,10 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
@Override
public void countDown() {
}
+
+ @Override
+ public void close() {
+ }
};
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index ff0340b..fc4d8ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -54,6 +54,13 @@ public class PauseInjection extends Injection {
latch.awaitUninterruptibly();
}
+ public void interruptiblePause() throws InterruptedException {
+ if (!injectNow()) {
+ return;
+ }
+ latch.await();
+ }
+
public void unpause() {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index f2352e6..1d3a0b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,13 +21,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.SelfCleaningRunnable;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -173,6 +173,10 @@ public class WorkManager implements AutoCloseable {
}
} catch (final InterruptedException e) {
logger.warn("Executor interrupted while awaiting termination");
+
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
}
}
@@ -180,7 +184,7 @@ public class WorkManager implements AutoCloseable {
return dContext;
}
- private CountDownLatch exitLatch = null; // used to wait to exit when things are still running
+ private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running
/**
* Waits until it is safe to exit. Blocks until all currently running fragments have completed.
@@ -193,17 +197,11 @@ public class WorkManager implements AutoCloseable {
return;
}
- exitLatch = new CountDownLatch(1);
+ exitLatch = new ExtendedLatch();
}
- while(true) {
- try {
- exitLatch.await(5, TimeUnit.SECONDS);
- } catch(final InterruptedException e) {
- // keep waiting
- }
- break;
- }
+ // Wait for at most 5 seconds or until the latch is released.
+ exitLatch.awaitUninterruptibly(5000);
}
/**
@@ -328,6 +326,10 @@ public class WorkManager implements AutoCloseable {
try {
Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
} catch(final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
// exit status thread on interrupt.
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 2a79e42..07a3505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -141,7 +141,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
}
@Override
- public RawFragmentBatch getNext() throws IOException {
+ public RawFragmentBatch getNext() throws IOException, InterruptedException {
if (outOfMemory && buffer.size() < 10) {
outOfMemory = false;
fragmentManager.setAutoRead(true);
@@ -160,9 +160,12 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
}
queueSize -= w.getBodySize();
return batch;
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
return null;
- // TODO InterruptedException
}
}
if (w == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index d23655c..4750666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -30,7 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
private static enum BufferState {
INIT,
@@ -149,7 +149,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
@Override
- public RawFragmentBatch getNext() {
+ public RawFragmentBatch getNext() throws IOException, InterruptedException {
if (outOfMemory.get() && buffer.size() < 10) {
logger.trace("Setting autoread true");
@@ -166,8 +166,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
try {
b = buffer.take();
} catch (final InterruptedException e) {
- return null;
- // TODO InterruptedException
+ logger.debug("Interrupted while waiting for incoming data.", e);
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0122ef8..bf62ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -109,7 +109,7 @@ public class Foreman implements Runnable {
private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
private static final ObjectMapper MAPPER = new ObjectMapper();
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
- private static final int RPC_WAIT_IN_SECONDS = 90;
+ private static final int RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
private final QueryId queryId;
private final RunQuery queryRequest;
@@ -967,7 +967,8 @@ public class Foreman implements Runnable {
* count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
* know if any submissions did fail.
*/
- final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
+ final int numIntFragments = intFragmentMap.keySet().size();
+ final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
@@ -975,16 +976,17 @@ public class Foreman implements Runnable {
sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
- if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+ final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+ if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
long numberRemaining = endpointLatch.getCount();
throw UserException.connectionError()
.message(
- "Exceeded timeout while waiting send intermediate work fragments to remote nodes. Sent %d and only heard response back from %d nodes.",
- intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+ "Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+ "Sent %d and only heard response back from %d nodes.",
+ timeout, numIntFragments, numIntFragments - numberRemaining)
.build();
}
-
// if any of the intermediate fragment submissions failed, fail the query
final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
if (submissionExceptions.size() > 0) {
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 24e2556..d96e6d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -62,6 +62,9 @@ public class FragmentExecutor implements Runnable {
private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
+ // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
+ private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
+
public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
final StatusReporter listener) {
this.fragmentContext = context;
@@ -136,6 +139,14 @@ public class FragmentExecutor implements Runnable {
* We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
*/
updateState(FragmentState.CANCELLATION_REQUESTED);
+
+ /*
+ * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+ */
+ final Thread myThread = myThreadRef.get();
+ if (myThread != null) {
+ myThread.interrupt();
+ }
}
}
@@ -168,6 +179,7 @@ public class FragmentExecutor implements Runnable {
@Override
public void run() {
final Thread myThread = Thread.currentThread();
+ myThreadRef.set(myThread);
final String originalThreadName = myThread.getName();
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
@@ -244,6 +256,8 @@ public class FragmentExecutor implements Runnable {
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
myThread.setName(originalThreadName);
+
+ myThreadRef.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 3e4dcb2..d72d498 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.server;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG;
+import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -43,6 +47,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -437,6 +445,20 @@ public class TestDrillbitResilience {
}
}
+ private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ (new CancellingThread(queryId, ex, null)).start();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ };
+
/**
* Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
*/
@@ -459,7 +481,9 @@ public class TestDrillbitResilience {
} catch (final RpcException ex) {
this.ex.value = ex;
}
- latch.countDown();
+ if (latch != null) {
+ latch.countDown();
+ }
}
}
@@ -507,13 +531,33 @@ public class TestDrillbitResilience {
* Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
*/
private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+ assertCancelled(controls, TEST_QUERY, listener);
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
+ */
+ private static void assertCancelled(final String controls, final String testQuery,
+ final WaitUntilCompleteListener listener) {
setControls(controls);
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, testQuery, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.CANCELED);
}
+ private static void setSessionOption(final String option, final String value) {
+ try {
+ final List<QueryDataBatch> results = drillClient.runQuery(QueryType.SQL,
+ String.format("alter session set `%s` = %s", option, value));
+ for (final QueryDataBatch data : results) {
+ data.release();
+ }
+ } catch(RpcException e) {
+ fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString()));
+ }
+ }
+
private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
return "{\"injections\" : [{"
+ "\"type\" : \"pause\"," +
@@ -667,4 +711,73 @@ public class TestDrillbitResilience {
final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
}
+
+ /**
+ * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+ * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
+ */
+ @Test
+ public void testInterruptingBlockedMergingRecordBatch() {
+ final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
+ testInterruptingBlockedFragmentsWaitingForData(control);
+ }
+
+ /**
+ * Test cancelling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen.
+ * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
+ */
+ @Test
+ public void testInterruptingBlockedUnorderedReceiverBatch() {
+ final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
+ testInterruptingBlockedFragmentsWaitingForData(control);
+ }
+
+ private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
+ try {
+ setSessionOption(SLICE_TARGET, "1");
+ setSessionOption(HASHAGG.getOptionName(), "false");
+
+ final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+ assertCancelled(control, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ } finally {
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+ }
+ }
+
+ /**
+ * Tests interrupting the fragment thread that is running {@link PartitionSenderRootExec}.
+ * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt
+ * the partitioner threads.
+ */
+ @Test
+ public void testInterruptingPartitionerThreadFragment() {
+ try {
+ setSessionOption(SLICE_TARGET, "1");
+ setSessionOption(HASHAGG.getOptionName(), "true");
+ setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+
+ final String controls = "{\"injections\" : ["
+ + "{"
+ + "\"type\" : \"latch\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"partitioner-sender-latch\""
+ + "},"
+ + "{"
+ + "\"type\" : \"pause\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"wait-for-fragment-interrupt\","
+ + "\"nSkip\" : 1"
+ + "}" +
+ "]}";
+
+ final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
+ assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+ } finally {
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
+ setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(),
+ Long.toString(PARTITION_SENDER_SET_THREADS.getDefault().num_val));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
new file mode 100644
index 0000000..2807c35
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.test.DrillTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.core.StringContains.containsString;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit testing for {@link TimedRunnable}.
+ */
+public class TestTimedRunnable extends DrillTest {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+
+ @Rule
+ public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
+
+ private static class TestTask extends TimedRunnable {
+ final long sleepTime; // sleep time in ms
+
+ public TestTask(final long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ protected Void runInner() throws Exception {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ return null;
+ }
+
+ @Override
+ protected IOException convertToIOException(Exception e) {
+ return new IOException("Failure while trying to sleep for sometime", e);
+ }
+ }
+
+ @Test
+ public void withoutAnyTasksTriggeringTimeout() throws Exception {
+ List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+ for(int i=0; i<100; i++){
+ tasks.add(new TestTask(2000));
+ }
+
+ TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16);
+ }
+
+ @Test
+ public void withTasksExceedingTimeout() throws Exception {
+ UserException ex = null;
+
+ try {
+ List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+
+ for (int i = 0; i < 100; i++) {
+ if ((i & (i + 1)) == 0) {
+ tasks.add(new TestTask(2000));
+ } else {
+ tasks.add(new TestTask(20000));
+ }
+ }
+
+ TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+ } catch (UserException e) {
+ ex = e;
+ }
+
+ assertNotNull("Expected a UserException", ex);
+ assertThat(ex.getMessage(),
+ containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
+ "complete. Total runnable size 100, parallelism 16."));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3a294abc/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 8ef2af3..2fe6c28 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -156,6 +156,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
// calling some wait method?
resultsListener.latch.await();
} catch ( InterruptedException e ) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
// Not normally expected--Drill doesn't interrupt in this area (right?)--
// but JDBC client certainly could.
throw new SQLException( "Interrupted", e );