You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:30:54 UTC
[09/12] drill git commit: DRILL-2697: Pauses sites wait indefinitely
for a resume signal DrillClient sends a resume signal to UserServer.
UserServer triggers a resume call in the correct Foreman. Foreman resumes all
pauses related to the query through th
DRILL-2697: Pauses sites wait indefinitely for a resume signal DrillClient sends a resume signal to UserServer. UserServer triggers a resume call in the correct Foreman. Foreman resumes all pauses related to the query through the Control layer.
+ Better error messages and more tests in TestDrillbitResilience and TestPauseInjection
+ Added execution controls to operator context
+ Removed ControlMessageHandler interface, renamed ControlHandlerImpl to ControlMessageHandler
+ Added CountDownLatchInjection, useful in cases like ParititionedSender that spawns multiple threads
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/52dcd7e4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/52dcd7e4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/52dcd7e4
Branch: refs/heads/merge_2015_05_09
Commit: 52dcd7e460e37031a08f6194cd597cc035d25a65
Parents: 87051d4
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Thu Apr 30 13:27:08 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:27:11 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/client/DrillClient.java | 10 +-
.../apache/drill/exec/ops/OperatorContext.java | 3 +
.../drill/exec/ops/OperatorContextImpl.java | 8 +
.../drill/exec/physical/impl/ScreenCreator.java | 6 +-
.../exec/rpc/control/ControlRpcConfig.java | 4 +-
.../drill/exec/rpc/control/ControlTunnel.java | 20 +-
.../drill/exec/rpc/user/UserRpcConfig.java | 3 +-
.../apache/drill/exec/rpc/user/UserServer.java | 9 +
.../drill/exec/store/pojo/PojoRecordReader.java | 16 +-
.../exec/testing/CountDownLatchInjection.java | 51 ++++
.../testing/CountDownLatchInjectionImpl.java | 85 ++++++
.../drill/exec/testing/ExecutionControls.java | 37 ++-
.../exec/testing/ExecutionControlsInjector.java | 13 +-
.../apache/drill/exec/testing/Injection.java | 6 +-
.../exec/testing/NoOpControlsInjector.java | 36 ++-
.../drill/exec/testing/PauseInjection.java | 33 +-
.../org/apache/drill/exec/work/WorkManager.java | 3 +-
.../exec/work/batch/ControlHandlerImpl.java | 197 ------------
.../exec/work/batch/ControlMessageHandler.java | 195 +++++++++++-
.../apache/drill/exec/work/foreman/Foreman.java | 25 +-
.../drill/exec/work/foreman/QueryManager.java | 44 ++-
.../exec/work/fragment/FragmentExecutor.java | 15 +-
.../exec/work/fragment/FragmentManager.java | 6 +
.../work/fragment/NonRootFragmentManager.java | 5 +
.../exec/work/fragment/RootFragmentManager.java | 9 +-
.../apache/drill/exec/work/user/UserWorker.java | 8 +
.../exec/server/TestDrillbitResilience.java | 303 +++++++++++--------
.../testing/TestCountDownLatchInjection.java | 155 ++++++++++
.../drill/exec/testing/TestPauseInjection.java | 146 ++++++++-
.../org/apache/drill/exec/proto/BitControl.java | 48 ++-
.../org/apache/drill/exec/proto/UserProtos.java | 46 ++-
.../apache/drill/exec/proto/beans/RpcType.java | 2 +
protocol/src/main/protobuf/BitControl.proto | 12 +-
protocol/src/main/protobuf/User.proto | 1 +
34 files changed, 1106 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 136d8c7..c642c4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -312,10 +312,18 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
- logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+ if(logger.isDebugEnabled()) {
+ logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+ }
return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
}
+ public DrillRpcFuture<Ack> resumeQuery(final QueryId queryId) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Resuming query {}", QueryIdHelper.getQueryId(queryId));
+ }
+ return client.send(RpcType.RESUME_PAUSED_QUERY, queryId, Ack.class);
+ }
/**
* Submits a Logical plan for direct execution (bypasses parsing)
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 7cc52ba..35139d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
public abstract class OperatorContext {
@@ -36,6 +37,8 @@ public abstract class OperatorContext {
public abstract OperatorStats getStats();
+ public abstract ExecutionControls getExecutionControls();
+
public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator();
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 6dbd880..9fa8867 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -24,11 +24,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import org.apache.drill.exec.testing.ExecutionControls;
class OperatorContextImpl extends OperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
private final BufferAllocator allocator;
+ private final ExecutionControls executionControls;
private boolean closed = false;
private PhysicalOperator popConfig;
private OperatorStats stats;
@@ -42,6 +44,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
this.stats = context.getStats().getOperatorStats(def, allocator);
+ executionControls = context.getExecutionControls();
}
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
@@ -49,6 +52,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
this.popConfig = popConfig;
this.stats = stats;
+ executionControls = context.getExecutionControls();
}
public DrillBuf replace(DrillBuf old, int newSize) {
@@ -70,6 +74,10 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return newBuf;
}
+ public ExecutionControls getExecutionControls() {
+ return executionControls;
+ }
+
public BufferAllocator getAllocator() {
if (allocator == null) {
throw new UnsupportedOperationException("Operator context does not have an allocator");
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c31de66..76dc91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -52,7 +52,6 @@ public class ScreenCreator implements RootCreator<Screen>{
static class ScreenRoot extends BaseRootExec {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming;
private final FragmentContext context;
private final AccountingUserConnection userConnection;
@@ -136,6 +135,11 @@ public class ScreenCreator implements RootCreator<Screen>{
}
+ @Override
+ public void close() throws Exception {
+ injector.injectPause(context.getExecutionControls(), "send-complete", logger);
+ super.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index f92bb49..0cfa56e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -40,16 +40,18 @@ public class ControlRpcConfig {
.name("CONTROL")
.timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
.add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
- .add(RpcType.REQ_INIATILIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_INITIALIZE_FRAGMENTS, InitializeFragments.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_CANCEL, QueryId.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
+ .add(RpcType.REQ_UNPAUSE_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
.build();
}
public static int RPC_VERSION = 3;
public static final Response OK = new Response(RpcType.ACK, Acks.OK);
+ public static final Response FAIL = new Response(RpcType.ACK, Acks.FAIL);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index a4f9fdf..16b9b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,12 +17,9 @@
*/
package org.apache.drill.exec.rpc.control;
-import java.util.Collection;
-
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -56,7 +53,12 @@ public class ControlTunnel {
}
public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
- CancelFragment b = new CancelFragment(outcomeListener, handle);
+ final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_CANCEL_FRAGMENT);
+ manager.runCommand(b);
+ }
+
+ public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
+ final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
manager.runCommand(b);
}
@@ -114,17 +116,19 @@ public class ControlTunnel {
}
}
- public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
+ public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
final FragmentHandle handle;
+ final RpcType type;
- public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+ public SignalFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle, RpcType type) {
super(listener);
this.handle = handle;
+ this.type = type;
}
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
}
}
@@ -139,7 +143,7 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENTS, fragments, Ack.class);
+ connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index ae728d8..3f8122d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -36,11 +36,12 @@ public class UserRpcConfig {
return RpcConfig.newBuilder()
.name("USER")
.timeout(config.getInt(ExecConstants.USER_RPC_TIMEOUT))
- .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit.
+ .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) // user to bit
.add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) // user to bit
.add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) // bit to user
.add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) // bit to user
+ .add(RpcType.RESUME_PAUSED_QUERY, QueryId.class, RpcType.ACK, Ack.class) // user to bit
.build();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index b3b7ae9..72b07ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -113,6 +113,15 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
throw new RpcException("Failure while decoding QueryId body.", e);
}
+ case RpcType.RESUME_PAUSED_QUERY_VALUE:
+ try {
+ final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ final Ack ack = worker.resumeQuery(queryId);
+ return new Response(RpcType.ACK, ack);
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding QueryId body.", e);
+ }
+
default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index cf98b83..a893da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -42,13 +42,16 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
public class PojoRecordReader<T> extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
+ private static final ExecutionControlsInjector injector =
+ ExecutionControlsInjector.getInjector(PojoRecordReader.class);
public final int forJsonIgnore = 1;
@@ -64,16 +67,9 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
this.iterator = iterator;
}
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- public void setOperatorContext(OperatorContext operatorContext) {
- this.operatorContext = operatorContext;
- }
-
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ operatorContext = context;
try {
Field[] fields = pojoClass.getDeclaredFields();
List<PojoWriter> writers = Lists.newArrayList();
@@ -147,7 +143,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
@Override
public int next() {
boolean allocated = false;
-
+ injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger);
try {
int i =0;
outside:
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
new file mode 100644
index 0000000..de4a181
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * This class is used internally for tracking injected countdown latches. These latches are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * This injection is useful in the case where a thread spawns multiple threads. The parent thread initializes the latch
+ * with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
+ * and same descriptor), and once there are enough, the parent thread continues.
+ */
+public interface CountDownLatchInjection {
+
+ /**
+ * Initializes the underlying latch
+ * @param count the number of times {@link #countDown} must be invoke before threads can pass through {@link #await}
+ */
+ void initialize(final int count);
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to zero, unless the thread is
+ * {@link Thread#interrupt interrupted}.
+ */
+ void await() throws InterruptedException;
+
+ /**
+ * Await without interruption. In the case of interruption, log a warning and continue to wait.
+ */
+ void awaitUninterruptibly();
+
+ /**
+ * Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
+ */
+ void countDown();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
new file mode 100644
index 0000000..f4012c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * See {@link org.apache.drill.exec.testing.CountDownLatchInjection} Degenerates to
+ * {@link org.apache.drill.exec.testing.PauseInjection#pause}, if initialized to zero count. In any case, this injection
+ * provides more control than PauseInjection.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class CountDownLatchInjectionImpl extends Injection implements CountDownLatchInjection {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountDownLatchInjectionImpl.class);
+
+ private ExtendedLatch latch = null;
+
+ @JsonCreator // ensures instances are created only through JSON
+ private CountDownLatchInjectionImpl(@JsonProperty("address") final String address,
+ @JsonProperty("port") final int port,
+ @JsonProperty("siteClass") final String siteClass,
+ @JsonProperty("desc") final String desc) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, 0, 1);
+ }
+
+ @Override
+ protected boolean injectNow() {
+ return true;
+ }
+
+ @Override
+ public void initialize(final int count) {
+ Preconditions.checkArgument(latch == null, "Latch can be initialized only once at %s in %s.", desc,
+ siteClass.getSimpleName());
+ Preconditions.checkArgument(count > 0, "Count has to be a positive integer at %s in %s.", desc,
+ siteClass.getSimpleName());
+ latch = new ExtendedLatch(count);
+ }
+
+ @Override
+ public void await() throws InterruptedException {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ try {
+ latch.await();
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
+ throw e;
+ }
+ }
+
+ @Override
+ public void awaitUninterruptibly() {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ latch.awaitUninterruptibly();
+ }
+
+ @Override
+ public void countDown() {
+ Preconditions.checkNotNull(latch, "Latch not initialized in %s at %s.", siteClass.getSimpleName(), desc);
+ Preconditions.checkArgument(latch.getCount() > 0, "Counting down on latch more than intended.");
+ latch.countDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 1171bf8..639802f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -50,13 +50,15 @@ public final class ExecutionControls {
controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
}
- // Jackson MixIn for all types of injections
+ // Jackson MixIn: an annotated class that is used only by Jackson's ObjectMapper to allow a list of injections to
+ // hold various types of injections
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
@Type(value = ExceptionInjection.class, name = "exception"),
+ @Type(value = CountDownLatchInjectionImpl.class, name = "latch"),
@Type(value = PauseInjection.class, name = "pause")})
public static abstract class InjectionMixIn {
}
@@ -99,7 +101,7 @@ public final class ExecutionControls {
final String jsonString = v.string_val;
try {
controlsOptionMapper.readValue(jsonString, Controls.class);
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
}
}
@@ -137,7 +139,7 @@ public final class ExecutionControls {
final Controls controls;
try {
controls = controlsOptionMapper.readValue(opString, Controls.class);
- } catch (IOException e) {
+ } catch (final IOException e) {
// This never happens. opString must have been validated.
logger.warn("Could not parse injections. Injections must have been validated before this point.");
throw new DrillRuntimeException("Could not parse injections.", e);
@@ -153,7 +155,7 @@ public final class ExecutionControls {
}
/**
- * Look for an exception injection matching the given injector, site description and endpoint.
+ * Look for an exception injection matching the given injector, site descriptor, and endpoint.
*
* @param injector the injector, which indicates a class
* @param desc the injection site description
@@ -165,7 +167,7 @@ public final class ExecutionControls {
}
/**
- * Look for an pause injection matching the given injector, site description and endpoint.
+ * Look for an pause injection matching the given injector, site descriptor, and endpoint.
*
* @param injector the injector, which indicates a class
* @param desc the injection site description
@@ -176,6 +178,20 @@ public final class ExecutionControls {
return injection != null ? (PauseInjection) injection : null;
}
+ /**
+ * Look for a count down latch injection matching the given injector, site descriptor, and endpoint.
+ *
+ * @param injector the injector, which indicates a class
+ * @param desc the injection site description
+ * @return the count down latch injection, if there is one for the injector, site and endpoint;
+ * otherwise, a latch that does nothing
+ */
+ public CountDownLatchInjection lookupCountDownLatchInjection(final ExecutionControlsInjector injector,
+ final String desc) {
+ final Injection injection = lookupInjection(injector, desc);
+ return injection != null ? (CountDownLatchInjection) injection : NoOpControlsInjector.LATCH;
+ }
+
private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
if (controls.isEmpty()) {
return null;
@@ -190,4 +206,15 @@ public final class ExecutionControls {
// return only if injection was meant for this drillbit
return injection.isValidForBit(endpoint) ? injection : null;
}
+
+ /**
+ * This method resumes all pauses within the current context (QueryContext or FragmentContext).
+ */
+ public void unpauseAll() {
+ for (final Injection injection : controls.values()) {
+ if (injection instanceof PauseInjection) {
+ ((PauseInjection) injection).unpause();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 4b1cd0c..05f8433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -75,12 +75,11 @@ public class ExecutionControlsInjector {
* @param desc the site description
* throws the exception specified by the injection, if it is time
*/
- public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+ public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
if (exceptionInjection != null) {
exceptionInjection.throwUnchecked();
}
- return this;
}
/**
@@ -95,13 +94,12 @@ public class ExecutionControlsInjector {
* @param exceptionClass the expected class of the exception (or a super class of it)
* @throws T the exception specified by the injection, if it is time
*/
- public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ public <T extends Throwable> void injectChecked(
final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
if (exceptionInjection != null) {
exceptionInjection.throwChecked(exceptionClass);
}
- return this;
}
/**
@@ -114,7 +112,7 @@ public class ExecutionControlsInjector {
* @param desc the site description
* @param logger logger of the class containing the injection site
*/
- public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ public void injectPause(final ExecutionControls executionControls, final String desc,
final Logger logger) {
final PauseInjection pauseInjection =
executionControls.lookupPauseInjection(this, desc);
@@ -124,6 +122,9 @@ public class ExecutionControlsInjector {
pauseInjection.pause();
logger.debug("Resuming at {}", desc);
}
- return this;
+ }
+
+ public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+ return executionControls.lookupCountDownLatchInjection(this, desc);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
index 96fed3a..08ade51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -28,8 +28,8 @@ public abstract class Injection {
protected final String address; // the address of the drillbit on which to inject
protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
- private final Class<?> siteClass; // the class where the injection should happen
- private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+ protected final Class<?> siteClass; // the class where the injection should happen
+ protected final String desc; // description of the injection site; useful for multiple exception injections in a single class
private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
private final AtomicInteger nFire; // the number of times to do the injection, after any skips; starts > 0
@@ -64,7 +64,7 @@ public abstract class Injection {
*
* @return if the injection should be injected now
*/
- protected final boolean injectNow() {
+ protected boolean injectNow() {
return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index 80d9790..33ab783 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.testing;
import org.slf4j.Logger;
/**
- * An injector that does not inject any controls.
+ * An injector that does not inject any controls, useful when not testing (i.e. assertions are not enabled).
*/
public final class NoOpControlsInjector extends ExecutionControlsInjector {
@@ -29,20 +29,42 @@ public final class NoOpControlsInjector extends ExecutionControlsInjector {
}
@Override
- public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
- return this;
+ public void injectUnchecked(final ExecutionControls executionControls, final String desc) {
}
@Override
- public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ public <T extends Throwable> void injectChecked(
final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
- return this;
}
@Override
- public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ public void injectPause(final ExecutionControls executionControls, final String desc,
final Logger logger) {
- return this;
}
+ /**
+ * When assertions are not enabled, this count down latch that does nothing is injected.
+ */
+ public static final CountDownLatchInjection LATCH = new CountDownLatchInjection() {
+ @Override
+ public void initialize(int count) {
+ }
+
+ @Override
+ public void await() {
+ }
+
+ @Override
+ public void awaitUninterruptibly() {
+ }
+
+ @Override
+ public void countDown() {
+ }
+ };
+
+ @Override
+ public CountDownLatchInjection getLatch(final ExecutionControls executionControls, final String desc) {
+ return LATCH;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
index e5f9c9c..ff0340b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -21,43 +21,40 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.DrillRuntimeException;
/**
- * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
- * injected pauses; these pauses are specified via
+ * Injection for a single pause. Pause indefinitely until signalled. This class is used internally for tracking
+ * injected pauses. Note that pauses can be fired only once; nFire field is ignored. These pauses are specified via
* {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
*
- * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ * After the pauses are set, the user sends another signal to unpause all the pauses. This triggers the Foreman to
+ * 1) unpause all pauses in QueryContext, and
+ * 2) send an unpause signal to all fragments, each of which unpauses all pauses in FragmentContext.
*/
@JsonAutoDetect(fieldVisibility = Visibility.ANY)
public class PauseInjection extends Injection {
- private final long millis;
+ private final ExtendedLatch latch = new ExtendedLatch(1);
@JsonCreator // ensures instances are created only through JSON
private PauseInjection(@JsonProperty("address") final String address,
@JsonProperty("port") final int port,
@JsonProperty("siteClass") final String siteClass,
@JsonProperty("desc") final String desc,
- @JsonProperty("nSkip") final int nSkip,
- @JsonProperty("nFire") final int nFire,
- @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
- super(address, port, siteClass, desc, nSkip, nFire);
- if (millis <= 0) {
- throw new InjectionConfigurationException("Pause millis is non-positive.");
- }
- this.millis = millis;
+ @JsonProperty("nSkip") final int nSkip) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, nSkip, 1);
}
public void pause() {
- if (! injectNow()) {
+ if (!injectNow()) {
return;
}
- try {
- Thread.sleep(millis);
- } catch (InterruptedException e) {
- throw new DrillRuntimeException("Well, I should be sleeping.");
- }
+ latch.awaitUninterruptibly();
+ }
+
+ public void unpause() {
+ latch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 3e4f3d1..f2352e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.QueryManager;
@@ -116,7 +115,7 @@ public class WorkManager implements AutoCloseable {
// TODO references to this escape here (via WorkerBee) before construction is done
- controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
+ controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
statusThread = new StatusThread();
dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
deleted file mode 100644
index b6c6852..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-import static org.apache.drill.exec.rpc.RpcBus.get;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConstants;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.UserRpcException;
-import org.apache.drill.exec.rpc.control.ControlConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.data.DataRpcConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-
-public class ControlHandlerImpl implements ControlMessageHandler {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
- private final WorkerBee bee;
-
- public ControlHandlerImpl(final WorkerBee bee) {
- this.bee = bee;
- }
-
- @Override
- public Response handle(final ControlConnection connection, final int rpcType,
- final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
- if (RpcConstants.EXTRA_DEBUGGING) {
- logger.debug("Received bit com message of type {}", rpcType);
- }
-
- switch (rpcType) {
-
- case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
- final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
- cancelFragment(handle);
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
- final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
- receivingFragmentFinished(finishedReceiver);
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
- // TODO: Support a type of message that has no response.
- return DataRpcConfig.OK;
-
- case RpcType.REQ_QUERY_CANCEL_VALUE: {
- final QueryId queryId = get(pBody, QueryId.PARSER);
- final Foreman foreman = bee.getForemanForQueryId(queryId);
- if (foreman != null) {
- foreman.cancel();
- return DataRpcConfig.OK;
- } else {
- return DataRpcConfig.FAIL;
- }
- }
-
- case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
- final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
- for(int i = 0; i < fragments.getFragmentCount(); i++) {
- startNewRemoteFragment(fragments.getFragment(i));
- }
- return DataRpcConfig.OK;
- }
-
- case RpcType.REQ_QUERY_STATUS_VALUE: {
- final QueryId queryId = get(pBody, QueryId.PARSER);
- final Foreman foreman = bee.getForemanForQueryId(queryId);
- if (foreman == null) {
- throw new RpcException("Query not running on node.");
- }
- final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
- return new Response(RpcType.RESP_QUERY_STATUS, profile);
- }
-
- default:
- throw new RpcException("Not yet supported.");
- }
- }
-
- @Override
- public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
- logger.debug("Received remote fragment start instruction", fragment);
-
- final DrillbitContext drillbitContext = bee.getContext();
- try {
- // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
- if (fragment.getLeafFragment()) {
- final FragmentContext context = new FragmentContext(drillbitContext, fragment,
- drillbitContext.getFunctionImplementationRegistry());
- final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
- final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
- fragment.getFragmentJson());
- final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
- bee.addFragmentRunner(fr);
- } else {
- // isIntermediate, store for incoming data.
- final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
- drillbitContext.getWorkBus().addFragmentManager(manager);
- }
-
- } catch (final Exception e) {
- throw new UserRpcException(drillbitContext.getEndpoint(),
- "Failure while trying to start remote fragment", e);
- } catch (final OutOfMemoryError t) {
- if (t.getMessage().startsWith("Direct buffer")) {
- throw new UserRpcException(drillbitContext.getEndpoint(),
- "Out of direct memory while trying to start remote fragment", t);
- } else {
- throw t;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
- */
- @Override
- public Ack cancelFragment(final FragmentHandle handle) {
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
- if (manager != null) {
- // try remote fragment cancel.
- manager.cancel();
- } else {
- // then try local cancel.
- final FragmentExecutor runner = bee.getFragmentRunner(handle);
- if (runner != null) {
- runner.cancel();
- }
- }
-
- return Acks.OK;
- }
-
- private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
-
- final FragmentManager manager =
- bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
-
- FragmentExecutor executor;
- if (manager != null) {
- manager.receivingFragmentFinished(finishedReceiver.getReceiver());
- } else {
- // then try local cancel.
- executor = bee.getFragmentRunner(finishedReceiver.getSender());
- if (executor != null) {
- executor.receivingFragmentFinished(finishedReceiver.getReceiver());
- } else {
- logger.warn(
- "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
- QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
- QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
- }
- }
-
- return Acks.OK;
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index c5d78cc..d12e6d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,24 +17,207 @@
*/
package org.apache.drill.exec.work.batch;
+import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
+import org.apache.drill.exec.rpc.control.ControlRpcConfig;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
-public interface ControlMessageHandler {
+public class ControlMessageHandler {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
+ private final WorkerBee bee;
- public abstract Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
- throws RpcException;
+ public ControlMessageHandler(final WorkerBee bee) {
+ this.bee = bee;
+ }
- public abstract void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException;
+ public Response handle(final ControlConnection connection, final int rpcType,
+ final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Received bit com message of type {}", rpcType);
+ }
- public abstract Ack cancelFragment(FragmentHandle handle);
+ switch (rpcType) {
+ case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ cancelFragment(handle);
+ return ControlRpcConfig.OK;
+ }
-}
\ No newline at end of file
+ case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+ final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+ receivingFragmentFinished(finishedReceiver);
+ return ControlRpcConfig.OK;
+ }
+
+ case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+ bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
+ // TODO: Support a type of message that has no response.
+ return ControlRpcConfig.OK;
+
+ case RpcType.REQ_QUERY_CANCEL_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.cancel();
+ return ControlRpcConfig.OK;
+ } else {
+ return ControlRpcConfig.FAIL;
+ }
+ }
+
+ case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+ final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+ for(int i = 0; i < fragments.getFragmentCount(); i++) {
+ startNewRemoteFragment(fragments.getFragment(i));
+ }
+ return ControlRpcConfig.OK;
+ }
+
+ case RpcType.REQ_QUERY_STATUS_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman == null) {
+ throw new RpcException("Query not running on node.");
+ }
+ final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+ return new Response(RpcType.RESP_QUERY_STATUS, profile);
+ }
+
+ case RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ resumeFragment(handle);
+ return ControlRpcConfig.OK;
+ }
+
+ default:
+ throw new RpcException("Not yet supported.");
+ }
+ }
+
+ private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
+ logger.debug("Received remote fragment start instruction", fragment);
+
+ final DrillbitContext drillbitContext = bee.getContext();
+ try {
+ // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+ if (fragment.getLeafFragment()) {
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment,
+ drillbitContext.getFunctionImplementationRegistry());
+ final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+ final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+ final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+ fragment.getFragmentJson());
+ final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+ bee.addFragmentRunner(fr);
+ } else {
+ // isIntermediate, store for incoming data.
+ final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+ drillbitContext.getWorkBus().addFragmentManager(manager);
+ }
+
+ } catch (final Exception e) {
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Failure while trying to start remote fragment", e);
+ } catch (final OutOfMemoryError t) {
+ if (t.getMessage().startsWith("Direct buffer")) {
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Out of direct memory while trying to start remote fragment", t);
+ } else {
+ throw t;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+ */
+ private Ack cancelFragment(final FragmentHandle handle) {
+ // cancel a pending fragment
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ if (manager != null) {
+ manager.cancel();
+ return Acks.OK;
+ }
+
+ // cancel a running fragment
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
+ if (runner != null) {
+ runner.cancel();
+ return Acks.OK;
+ }
+
+ // fragment completed or does not exist
+ logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+ return Acks.OK;
+ }
+
+ private Ack resumeFragment(final FragmentHandle handle) {
+ // resume a pending fragment
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ if (manager != null) {
+ manager.unpause();
+ return Acks.OK;
+ }
+
+ // resume a paused fragment
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
+ if (runner != null) {
+ runner.unpause();
+ return Acks.OK;
+ }
+
+ // fragment completed or does not exist
+ logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+ return Acks.OK;
+ }
+
+ private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
+ final FragmentManager manager =
+ bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+
+ FragmentExecutor executor;
+ if (manager != null) {
+ manager.receivingFragmentFinished(finishedReceiver.getReceiver());
+ } else {
+ executor = bee.getFragmentRunner(finishedReceiver.getSender());
+ if (executor != null) {
+ executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+ } else {
+ logger.warn(
+ "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+ QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+ QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+ }
+ }
+
+ return Acks.OK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4d403b8..0122ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -119,6 +119,7 @@ public class Foreman implements Runnable {
private final DrillbitContext drillbitContext;
private final UserClientConnection initiatingClient; // used to send responses
private volatile QueryState state;
+ private boolean resume = false;
private volatile DistributedLease lease; // used to limit the number of concurrent queries
@@ -196,6 +197,18 @@ public class Foreman implements Runnable {
}
/**
+ * Resume the query. Regardless of the current state, this method sends a resume signal to all fragments.
+ * This method can be called multiple times.
+ */
+ public void resume() {
+ resume = true;
+ // resume all pauses through query context
+ queryContext.getExecutionControls().unpauseAll();
+ // resume all pauses through all fragment contexts
+ queryManager.unpauseExecutingFragments(drillbitContext, rootRunner);
+ }
+
+ /**
* Called by execution pool to do query setup, and kick off remote execution.
*
* <p>Note that completion of this function is not the end of the Foreman's role
@@ -268,9 +281,20 @@ public class Foreman implements Runnable {
* If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
* make sure that we can't make things any worse as those events are delivered, but allow
* any necessary remaining cleanup to proceed.
+ *
+ * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+ * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+ * to accept events.
*/
acceptExternalEvents.countDown();
+ // If we received the resume signal before fragments are setup, the first call does not actually resume the
+ // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+ if(resume) {
+ resume();
+ }
+ injector.injectPause(queryContext.getExecutionControls(), "foreman-ready", logger);
+
// restore the thread's original name
currentThread.setName(originalName);
}
@@ -375,7 +399,6 @@ public class Foreman implements Runnable {
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
logger.debug("Submitting fragments to run.");
- injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
// set up the root fragment first so we'll have incoming buffers available.
setupRootFragment(rootPlanFragment, work.getRootOperator());
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index c4646bd..090a377 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -189,7 +189,8 @@ public class QueryManager {
final DrillbitEndpoint endpoint = data.getEndpoint();
final FragmentHandle handle = data.getHandle();
// TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
- controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+ controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
+ SignalListener.Signal.CANCEL), handle);
}
break;
@@ -203,25 +204,52 @@ public class QueryManager {
}
}
+ /**
+ * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
+ * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the
+ * control tunnel.
+ */
+ void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ if (rootRunner != null) {
+ rootRunner.unpause();
+ }
+ final Controller controller = drillbitContext.getController();
+ for(final FragmentData data : fragmentDataSet) {
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ final FragmentHandle handle = data.getHandle();
+ controller.getTunnel(endpoint).resumeFragment(new SignalListener(endpoint, handle,
+ SignalListener.Signal.UNPAUSE), handle);
+ }
+ }
+
/*
* This assumes that the FragmentStatusListener implementation takes action when it hears
- * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+ * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
* but log messages.
*/
- private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
- public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+ private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
+ /**
+ * An enum of possible signals that {@link SignalListener} listens to.
+ */
+ public static enum Signal { CANCEL, UNPAUSE }
+
+ private final Signal signal;
+
+ public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
super(endpoint, handle);
+ this.signal = signal;
}
@Override
public void failed(final RpcException ex) {
- logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+ logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
}
@Override
- public void success(final Ack value, final ByteBuf buf) {
- if (!value.getOk()) {
- logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+ public void success(final Ack ack, final ByteBuf buf) {
+ if (!ack.getOk()) {
+ logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
+ ack);
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7baafc4..24e2556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -108,7 +108,7 @@ public class FragmentExecutor implements Runnable {
/**
* Cancel the execution of this fragment is in an appropriate state. Messages come from external.
- * NOTE that this can be called from threads *other* than the one running this runnable(),
+ * NOTE that this will be called from threads *other* than the one running this runnable(),
* so we need to be careful about the state transitions that can result.
*/
public void cancel() {
@@ -140,11 +140,18 @@ public class FragmentExecutor implements Runnable {
}
/**
+ * Resume all the pauses within the current context. Note that this method will be called from threads *other* than
+ * the one running this runnable(). Also, this method can be called multiple times.
+ */
+ public synchronized void unpause() {
+ fragmentContext.getExecutionControls().unpauseAll();
+ }
+
+ /**
* Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
* called in the case that a limit query is executed.
*
- * @param handle
- * The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+ * @param handle The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
*/
public void receivingFragmentFinished(final FragmentHandle handle) {
acceptExternalEvents.awaitUninterruptibly();
@@ -277,6 +284,8 @@ public class FragmentExecutor implements Runnable {
// first close the operators and release all memory.
try {
+ // Say executor was cancelled before setup. Now when executor actually runs, root is not initialized, but this
+ // method is called in finally. So root can be null.
if (root != null) {
root.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 0ba91b4..ad880da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -49,6 +49,12 @@ public interface FragmentManager {
public abstract void cancel();
+ /**
+ * If the executor is paused (for testing), this method should unpause the executor. This method should handle
+ * multiple calls.
+ */
+ public abstract void unpause();
+
public boolean isWaiting();
public abstract FragmentHandle getHandle();
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index f526fbe..ca5d5b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -112,6 +112,11 @@ public class NonRootFragmentManager implements FragmentManager {
}
@Override
+ public void unpause() {
+ runner.unpause();
+ }
+
+ @Override
public FragmentHandle getHandle() {
return fragment.getHandle();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b1c3fe0..67ef9b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -29,8 +29,8 @@ import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.work.batch.IncomingBuffers;
// TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+public class RootFragmentManager implements FragmentManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
@@ -70,6 +70,11 @@ public class RootFragmentManager implements FragmentManager{
}
@Override
+ public void unpause() {
+ runner.unpause();
+ }
+
+ @Override
public boolean isWaiting() {
return !buffers.isDone() && !cancel;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/52dcd7e4/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 8854ef3..e8deb4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -68,6 +68,14 @@ public class UserWorker{
return Acks.OK;
}
+ public Ack resumeQuery(final QueryId queryId) {
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.resume();
+ }
+ return Acks.OK;
+ }
+
public OptionManager getSystemOptions() {
return bee.getContext().getOptionManager();
}