You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/21 05:00:21 UTC
[1/2] drill git commit: DRILL-2383: Support to inject exceptions and
pauses in various components of Drill + Controls are fired only if assertions
are enabled + Controls can be introduced in any class that has access to
FragmentContext/QueryContext + Con
Repository: drill
Updated Branches:
refs/heads/master 21dfe7ac8 -> be8d95393
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
deleted file mode 100644
index 0292c08..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Tracks the simulated exceptions that will be injected for testing purposes.
- */
-public class SimulatedExceptions {
- /**
- * Caches the currently specified ExceptionInjections. Updated when
- * {@link org.apache.drill.exec.ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS} is noticed
- * to have changed.
- */
- private HashMap<InjectionSite, ExceptionInjection> exMap = null;
-
- /**
- * The string that was parsed to produce exMap; we keep it as a means to quickly detect whether
- * the option string has changed or not between calls to getOption().
- */
- private String exString = null;
-
- /**
- * POJO used to parse JSON-specified exception injection.
- */
- public static class InjectionOption {
- public String siteClass;
- public String desc;
- public int nSkip;
- public int nFire;
- public String exceptionClass;
- }
-
- /**
- * POJO used to parse JSON-specified set of exception injections.
- */
- public static class InjectionOptions {
- public InjectionOption injections[];
- }
-
- /**
- * Look for an exception injection matching the given injector and site description.
- *
- * @param drillbitContext
- * @param injector the injector, which indicates a class
- * @param desc the injection site description
- * @return the exception injection, if there is one for the injector and site; null otherwise
- */
- public synchronized ExceptionInjection lookupInjection(
- final DrillbitContext drillbitContext, final ExceptionInjector injector, final String desc) {
- // get the option string
- final OptionManager optionManager = drillbitContext.getOptionManager();
- final OptionValue optionValue = optionManager.getOption(ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS);
- final String opString = optionValue.string_val;
-
- // if the option string is empty, there's nothing to inject
- if ((opString == null) || opString.isEmpty()) {
- // clear these in case there used to be something to inject
- exMap = null;
- exString = null;
- return null;
- }
-
- // if the option string is different from before, recreate the injection map
- if ((exString == null) || (exString != opString) && !exString.equals(opString)) {
- // parse the option string into JSON
- final ObjectMapper objectMapper = new ObjectMapper();
- InjectionOptions injectionOptions;
- try {
- injectionOptions = objectMapper.readValue(opString, InjectionOptions.class);
- } catch(IOException e) {
- throw new RuntimeException("Couldn't parse exception injections", e);
- }
-
- // create a new map from the option JSON
- exMap = new HashMap<>();
- for(InjectionOption injectionOption : injectionOptions.injections) {
- addToMap(exMap, injectionOption);
- }
-
- // this is the current set of options in effect
- exString = opString;
- }
-
- // lookup the request
- final InjectionSite injectionSite = new InjectionSite(injector.getSiteClass(), desc);
- final ExceptionInjection injection = exMap.get(injectionSite);
- return injection;
- }
-
- /**
- * Adds a single exception injection to the injection map
- *
- * <p>Validates injection options before adding to the map, and throws various exceptions for
- * validation failures.
- *
- * @param exMap the injection map
- * @param injectionOption the option to add
- */
- private static void addToMap(
- final HashMap<InjectionSite, ExceptionInjection> exMap, final InjectionOption injectionOption) {
- Class<?> siteClass;
- try {
- siteClass = Class.forName(injectionOption.siteClass);
- } catch(ClassNotFoundException e) {
- throw new RuntimeException("Injection siteClass not found", e);
- }
-
- if ((injectionOption.desc == null) || injectionOption.desc.isEmpty()) {
- throw new RuntimeException("Injection desc is null or empty");
- }
-
- if (injectionOption.nSkip < 0) {
- throw new RuntimeException("Injection nSkip is not non-negative");
- }
-
- if (injectionOption.nFire <= 0) {
- throw new RuntimeException("Injection nFire is non-positive");
- }
-
- Class<?> clazz;
- try {
- clazz = Class.forName(injectionOption.exceptionClass);
- } catch(ClassNotFoundException e) {
- throw new RuntimeException("Injected exceptionClass not found", e);
- }
-
- if (!Throwable.class.isAssignableFrom(clazz)) {
- throw new RuntimeException("Injected exceptionClass is not a Throwable");
- }
-
- @SuppressWarnings("unchecked")
- final Class<? extends Throwable> exceptionClass = (Class<? extends Throwable>) clazz;
-
- final InjectionSite injectionSite = new InjectionSite(siteClass, injectionOption.desc);
- final ExceptionInjection exceptionInjection = new ExceptionInjection(
- injectionOption.desc, injectionOption.nSkip, injectionOption.nFire, exceptionClass);
- exMap.put(injectionSite, exceptionInjection);
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94b9f0..d94ffba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -67,7 +67,7 @@ import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.testing.ExceptionInjector;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.QueryWorkUnit;
@@ -100,7 +100,7 @@ import com.google.common.collect.Sets;
*/
public class Foreman implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
- private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
private static final int RPC_WAIT_IN_SECONDS = 90;
private final QueryId queryId;
@@ -190,8 +190,7 @@ public class Foreman implements Runnable {
queryManager.markStartTime();
try {
- injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class);
-
+ injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
// convert a run query request into action
switch (queryRequest.getType()) {
case LOGICAL:
@@ -206,7 +205,7 @@ public class Foreman implements Runnable {
default:
throw new IllegalStateException();
}
- injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
+ injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
} catch (final ForemanException e) {
moveToState(QueryState.FAILED, e);
} catch (AssertionError | Exception ex) {
@@ -346,6 +345,7 @@ public class Foreman implements Runnable {
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
logger.debug("Submitting fragments to run.");
+ injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
// set up the root fragment first so we'll have incoming buffers available.
setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
@@ -586,6 +586,7 @@ public class Foreman implements Runnable {
Preconditions.checkState(resultState != null);
logger.info("foreman cleaning up.");
+ injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
@@ -900,6 +901,7 @@ public class Foreman implements Runnable {
.build();
}
+ injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class);
/*
* Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
* the regular sendListener event delivery.
@@ -927,7 +929,7 @@ public class Foreman implements Runnable {
}
final InitializeFragments initFrags = fb.build();
- logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
+ logger.debug("Sending remote fragments to \nNode:\n{} \n\nData:\n{}", assignment, initFrags);
final FragmentSubmitListener listener =
new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
controller.getTunnel(assignment).sendFragments(listener, initFrags);
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 3570ba5..be798ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.fragment;
+import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,6 +36,8 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
/**
@@ -43,6 +46,7 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
*/
public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
private final String fragmentName;
private final FragmentRoot rootOperator;
@@ -101,16 +105,29 @@ public class FragmentExecutor implements Runnable {
/**
* Cancel the execution of this fragment is in an appropriate state. Messages come from external.
+ * NOTE that this can be called from threads *other* than the one running this runnable(),
+ * so we need to be careful about the state transitions that can result.
*/
public void cancel() {
- acceptExternalEvents.awaitUninterruptibly();
-
/*
- * Note that this can be called from threads *other* than the one running this runnable(), so we need to be careful
- * about the state transitions that can result. We set the cancel requested flag but the actual cancellation is
- * managed by the run() loop.
+ * When cancel() is called before run(), root is not initialized and the executor is not
+ * ready to accept external events. So do not wait to change the state.
+ *
+ * For example, consider the case when the Foreman sets up the root fragment executor which is
+ * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
+ * run() method on the root executor will never be called, and the executor will never be ready
+ * to accept external events. This will make the cancelling thread wait forever.
*/
- updateState(FragmentState.CANCELLATION_REQUESTED);
+ synchronized (this) {
+ if (root != null) {
+ acceptExternalEvents.awaitUninterruptibly();
+ }
+
+ /*
+ * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
+ */
+ updateState(FragmentState.CANCELLATION_REQUESTED);
+ }
}
/**
@@ -137,7 +154,8 @@ public class FragmentExecutor implements Runnable {
final Thread myThread = Thread.currentThread();
final String originalThreadName = myThread.getName();
final FragmentHandle fragmentHandle = fragmentContext.getHandle();
- final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
+ final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
+ final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
@@ -145,16 +163,26 @@ public class FragmentExecutor implements Runnable {
myThread.setName(newThreadName);
- root = ImplCreator.getExec(fragmentContext, rootOperator);
+ synchronized (this) {
+ /*
+ * fragmentState might have changed even before this method is called e.g. cancel()
+ */
+ if (shouldContinue()) {
+ root = ImplCreator.getExec(fragmentContext, rootOperator);
- clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
- updateState(FragmentState.RUNNING);
+ clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+ updateState(FragmentState.RUNNING);
- acceptExternalEvents.countDown();
+ acceptExternalEvents.countDown();
- logger.debug("Starting fragment runner. {}:{}",
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+ final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+ logger.debug("Starting fragment {}:{} on {}:{}",
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+ endpoint.getAddress(), endpoint.getUserPort());
+ }
+ }
+ injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
/*
* Run the query until root.next returns false OR we no longer need to continue.
*/
@@ -163,7 +191,6 @@ public class FragmentExecutor implements Runnable {
}
updateState(FragmentState.FINISHED);
-
} catch (AssertionError | Exception e) {
fail(e);
} finally {
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 854f474..8854ef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -32,6 +34,12 @@ public class UserWorker{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
private final WorkerBee bee;
+ private final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
+ @Override
+ public void increment(final UserSession session) {
+ session.incrementQueryCount(this);
+ }
+ };
public UserWorker(WorkerBee bee) {
super();
@@ -46,6 +54,7 @@ public class UserWorker{
long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
long p2 = r.nextLong();
QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
+ incrementer.increment(connection.getSession());
Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
bee.addNewForeman(foreman);
return id;
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index d8ed0b3..2ff4de7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -370,7 +371,7 @@ public class BaseTestQuery extends ExecTest {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
System.out.println("Query completed successfully with row count: " + count.get());
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 264123f..c3223b8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.QueryOptionManager;
@@ -44,6 +43,7 @@ import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.testing.ExecutionControls;
import org.junit.Rule;
import org.junit.rules.TestRule;
@@ -77,8 +77,10 @@ public class PlanningBase extends ExecTest{
final SystemOptionManager systemOptions = new SystemOptionManager(config, provider);
systemOptions.init();
- final SessionOptionManager sessionOptions = new SessionOptionManager(systemOptions);
+ final UserSession userSession = UserSession.Builder.newBuilder().withOptionManager(systemOptions).build();
+ final SessionOptionManager sessionOptions = (SessionOptionManager) userSession.getOptions();
final QueryOptionManager queryOptions = new QueryOptionManager(sessionOptions);
+ final ExecutionControls executionControls = new ExecutionControls(queryOptions, DrillbitEndpoint.getDefaultInstance());
new NonStrictExpectations() {
{
@@ -126,6 +128,8 @@ public class PlanningBase extends ExecTest{
result = table;
context.getAllocator();
result = allocator;
+ context.getExecutionControls();
+ result = executionControls;
}
};
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 99aa9fc..2cd5c95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -58,7 +58,8 @@ public abstract class SingleRowListener implements UserResultsListener {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
+ queryState = state;
try {
cleanup();
} finally {
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index f5f5b8d..f2240cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.options.SessionOptionManager;
@@ -43,7 +44,10 @@ public class TestClassTransformation extends BaseTestQuery {
@BeforeClass
public static void beforeTestClassTransformation() throws Exception {
- sessionOptions = new SessionOptionManager(getDrillbitContext().getOptionManager());
+ final UserSession userSession = UserSession.Builder.newBuilder()
+ .withOptionManager(getDrillbitContext().getOptionManager())
+ .build();
+ sessionOptions = (SessionOptionManager) userSession.getOptions();
}
@Test
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 7aee6d3..ee0e841 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,14 +17,7 @@
*/
package org.apache.drill.exec.server;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.commons.math3.util.Pair;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
@@ -32,41 +25,57 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.ExceptionWrapper;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.testing.ExceptionInjectionUtil;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points.
+ * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
*/
-public class TestDrillbitResilience extends ExecTest {
+public class TestDrillbitResilience {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
private static ZookeeperHelper zkHelper;
@@ -74,6 +83,13 @@ public class TestDrillbitResilience extends ExecTest {
private static final Map<String, Drillbit> drillbits = new HashMap<>();
private static DrillClient drillClient;
+ /**
+ * Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to
+ * counting sys.drillbits.
+ */
+ private static final String TEST_QUERY = "select * from sys.memory";
+ private static final long PAUSE_TIME_MILLIS = 1000L;
+
private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
if (drillbits.containsKey(name)) {
throw new IllegalStateException("Drillbit named \"" + name + "\" already exists");
@@ -83,7 +99,7 @@ public class TestDrillbitResilience extends ExecTest {
@SuppressWarnings("resource")
final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet);
drillbits.put(name, drillbit);
- } catch(DrillbitStartupException e) {
+ } catch (final DrillbitStartupException e) {
throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e);
}
}
@@ -91,7 +107,7 @@ public class TestDrillbitResilience extends ExecTest {
/**
* Shutdown the specified drillbit.
*
- * @param name
+ * @param name name of the drillbit
*/
private static void stopDrillbit(final String name) {
@SuppressWarnings("resource")
@@ -102,7 +118,7 @@ public class TestDrillbitResilience extends ExecTest {
try {
drillbit.close();
- } catch(Exception e) {
+ } catch (final Exception e) {
final String message = "Error shutting down Drillbit \"" + name + "\"";
System.err.println(message + '.');
logger.warn(message, e);
@@ -113,9 +129,10 @@ public class TestDrillbitResilience extends ExecTest {
* Shutdown all the drillbits.
*/
private static void stopAllDrillbits() {
- for(String name : drillbits.keySet()) {
+ for (String name : drillbits.keySet()) {
stopDrillbit(name);
}
+ drillbits.clear();
}
/*
@@ -140,11 +157,11 @@ public class TestDrillbitResilience extends ExecTest {
startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
startDrillbit(DRILLBIT_BETA, remoteServiceSet);
startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
- clearAllInjections();
// create a client
final DrillConfig drillConfig = zkHelper.getConfig();
drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null);
+ clearAllInjections();
}
@AfterClass
@@ -165,17 +182,16 @@ public class TestDrillbitResilience extends ExecTest {
}
/**
- * Clear all injections from all drillbits.
+ * Clear all exceptions.
*/
private static void clearAllInjections() {
- for(Drillbit drillbit : drillbits.values()) {
- ExceptionInjectionUtil.clearInjections(drillbit);
- }
+ assertTrue(drillClient != null);
+ ControlsInjectionUtil.clearControls(drillClient);
}
/**
* Check that all the drillbits are ok.
- *
+ * <p/>
* <p>The current implementation does this by counting the number of drillbits using a
* query.
*/
@@ -190,7 +206,7 @@ public class TestDrillbitResilience extends ExecTest {
final QueryData queryData = queryResultBatch.getHeader();
try {
loader.load(queryData.getDef(), queryResultBatch.getData());
- } catch(SchemaChangeException e) {
+ } catch (final SchemaChangeException e) {
fail(e.toString());
}
assertEquals(1, loader.getRecordCount());
@@ -223,10 +239,10 @@ public class TestDrillbitResilience extends ExecTest {
};
try {
- QueryTestUtil.testWithListener(
- drillClient, QueryType.SQL, "select count(*) from sys.drillbits", listener);
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
listener.waitForCompletion();
- } catch(Exception e) {
+ assertTrue(listener.getQueryState() == QueryState.COMPLETED);
+ } catch (final Exception e) {
throw new RuntimeException("Couldn't query active drillbits", e);
}
@@ -242,88 +258,91 @@ public class TestDrillbitResilience extends ExecTest {
}
/**
- * Set the given injections on a single named drillbit.
- *
- * @param bitName
- * @param injectionOptions the injections
- */
- private static void setInjections(final String bitName, final InjectionOptions injectionOptions) {
- @SuppressWarnings("resource")
- final Drillbit drillbit = drillbits.get(bitName);
- if (drillbit == null) {
- throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found");
- }
-
- ExceptionInjectionUtil.setInjections(drillbit, injectionOptions);
- }
-
- /**
- * Set the given injections on all drillbits.
- *
- * @param injectionOptions the injections
+ * Set the given exceptions.
*/
- private static void setInjectionsAll(final InjectionOptions injectionOptions) {
- for(Drillbit drillbit : drillbits.values()) {
- ExceptionInjectionUtil.setInjections(drillbit, injectionOptions);
- }
+ private static void setExceptions(final String controlsString) {
+ ControlsInjectionUtil.setControls(drillClient, controlsString);
}
/**
* Create a single exception injection.
*
- * @param siteClassName the name of the injection site class
- * @param desc the injection site description
- * @param exceptionClassName the name of the exception to throw
- * @return the created injection options POJO
+ * @param siteClass the injection site class
+ * @param desc the injection site description
+ * @param exceptionClass the class of the exception to throw
+ * @return the created controls JSON as string
*/
- private static InjectionOptions createSingleInjection(
- final String siteClassName, final String desc, final String exceptionClassName) {
- final InjectionOption injectionOption = new InjectionOption();
- injectionOption.nFire = 1;
- injectionOption.siteClass = siteClassName;
- injectionOption.desc = desc;
- injectionOption.exceptionClass = exceptionClassName;
-
- final InjectionOptions injectionOptions = new InjectionOptions();
- injectionOptions.injections = new InjectionOption[1];
- injectionOptions.injections[0] = injectionOption;
-
- return injectionOptions;
+ private static String createSingleException(final Class<?> siteClass, final String desc,
+ final Class<? extends Throwable> exceptionClass) {
+ final String siteClassName = siteClass.getName();
+ final String exceptionClassName = exceptionClass.getName();
+ return "{\"injections\":[{"
+ + "\"type\":\"exception\","
+ + "\"siteClass\":\"" + siteClassName + "\","
+ + "\"desc\":\"" + desc + "\","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1,"
+ + "\"exceptionClass\":\"" + exceptionClassName + "\""
+ + "}]}";
}
/**
* Create a single exception injection.
*
- * @param siteClass the injection site class
- * @param desc the injection site description
+ * @param siteClass the injection site class
+ * @param desc the injection site description
* @param exceptionClass the class of the exception to throw
- * @return the created injection options POJO
+ * @param bitName the drillbit name which should be injected into
+ * @return the created controls JSON as string
*/
- private static InjectionOptions createSingleInjection(
- final Class<?> siteClass, final String desc, final Class<? extends Throwable> exceptionClass) {
- return createSingleInjection(siteClass.getName(), desc, exceptionClass.getName());
+ private static String createSingleExceptionOnBit(final Class<?> siteClass, final String desc,
+ final Class<? extends Throwable> exceptionClass,
+ final String bitName) {
+ final String siteClassName = siteClass.getName();
+ final String exceptionClassName = exceptionClass.getName();
+ @SuppressWarnings("resource")
+ final Drillbit drillbit = drillbits.get(bitName);
+ if (drillbit == null) {
+ throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found");
+ }
+
+ final DrillbitEndpoint endpoint = drillbit.getContext().getEndpoint();
+ return "{\"injections\":[{"
+ + "\"address\":\"" + endpoint.getAddress() + "\","
+ + "\"port\":\"" + endpoint.getUserPort() + "\","
+ + "\"type\":\"exception\","
+ + "\"siteClass\":\"" + siteClassName + "\","
+ + "\"desc\":\"" + desc + "\","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1,"
+ + "\"exceptionClass\":\"" + exceptionClassName + "\""
+ + "}]}";
}
/**
* Check that the injected exception is what we were expecting.
*
- * @param caught the exception that was caught (by the test)
+ * @param throwable the throwable that was caught (by the test)
* @param exceptionClass the expected exception class
- * @param desc the expected exception site description
+ * @param desc the expected exception site description
*/
- private static void assertInjected(
- final UserException caught, final Class<? extends Throwable> exceptionClass, final String desc) {
- ExceptionWrapper cause = caught.getOrCreatePBError(false).getException();
+ private static void assertExceptionInjected(final Throwable throwable,
+ final Class<? extends Throwable> exceptionClass, final String desc) {
+ assertTrue(throwable instanceof UserException);
+ final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
assertEquals(exceptionClass.getName(), cause.getExceptionClass());
assertEquals(desc, cause.getMessage());
}
@Test
- public void testSettingNoopInjectionsAndQuery() throws Exception {
- final InjectionOptions injectionOptions =
- createSingleInjection(getClass(), "noop", RuntimeException.class);
- setInjections(DRILLBIT_BETA, injectionOptions);
- QueryTestUtil.test(drillClient, "select * from sys.drillbits");
+ public void settingNoopInjectionsAndQuery() {
+ final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
+ setExceptions(controls);
+ try {
+ QueryTestUtil.test(drillClient, TEST_QUERY);
+ } catch (final Exception e) {
+ fail(e.getMessage());
+ }
}
/**
@@ -331,51 +350,25 @@ public class TestDrillbitResilience extends ExecTest {
* description
*
* @param desc site description
- * @throws Exception
*/
- private static void testForeman(final String desc) throws Exception {
- final InjectionOptions injectionOptions = createSingleInjection(Foreman.class, desc, ForemanException.class);
- setInjectionsAll(injectionOptions);
+ private static void testForeman(final String desc) {
+ final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
+ setExceptions(controls);
try {
- QueryTestUtil.test(drillClient, "select * from sys.drillbits");
+ QueryTestUtil.test(drillClient, TEST_QUERY);
fail();
- } catch(UserException dre) {
- assertInjected(dre, ForemanException.class, desc);
+ } catch (final Exception e) {
+ assertExceptionInjected(e, ForemanException.class, desc);
}
}
@SuppressWarnings("static-method")
@Test
- public void testForeman_runTryBeginning() throws Exception {
+ public void foreman_runTryBeginning() {
testForeman("run-try-beginning");
}
- @SuppressWarnings("static-method")
- @Test
- public void testForeman_setInjectionViaAlterSystem() throws Exception {
- final String exceptionDesc = "run-try-beginning";
- final InjectionOptions injectionOptions =
- createSingleInjection(Foreman.class, exceptionDesc, ForemanException.class);
- final ObjectMapper objectMapper = new ObjectMapper();
- final String jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(injectionOptions);
- final String alterSession = String.format(
- "alter system set `%s`='%s'",
- ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, jsonString);
- QueryTestUtil.test(drillClient, alterSession);
- try {
- QueryTestUtil.test(drillClient, "select * from sys.drillbits");
- fail();
- } catch(UserException dre) {
- assertInjected(dre, ForemanException.class, exceptionDesc);
- }
- }
-
/*
- * This test doesn't work because worker threads have returned the result to the client before
- * Foreman.run() has even finished executing. This might not happen if the results are larger.
- * This brings up the question of how we detect failed queries, because here a failure is happening
- * after the query starts running, yet apparently the query still succeeds.
- *
* TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way
* that it gates input via stateListener. That could be tricky, since some results could be
* queued up before Foreman has gotten through it's run(), and they would all have to be sent
@@ -389,8 +382,234 @@ public class TestDrillbitResilience extends ExecTest {
*/
@SuppressWarnings("static-method")
@Test
- @Ignore
- public void testForeman_runTryEnd() throws Exception {
+ public void foreman_runTryEnd() {
testForeman("run-try-end");
}
+
+ private static class WaitUntilCompleteListener implements UserResultsListener {
+ protected final CountDownLatch latch;
+ protected QueryId queryId = null;
+ protected Exception ex = null;
+ protected QueryState state = null;
+
+ public WaitUntilCompleteListener(final int count) {
+ latch = new CountDownLatch(count);
+ }
+
+ @Override
+ public void queryIdArrived(final QueryId queryId) {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void submissionFailed(final UserException ex) {
+ this.ex = ex;
+ state = QueryState.FAILED;
+ latch.countDown();
+ }
+
+ @Override
+ public void queryCompleted(final QueryState state) {
+ this.state = state;
+ latch.countDown();
+ }
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ result.release();
+ }
+
+ public final Pair<QueryState, Exception> waitForCompletion() {
+ try {
+ latch.await();
+ } catch (final InterruptedException e) {
+ return new Pair<QueryState, Exception>(state, e);
+ }
+ return new Pair<>(state, ex);
+ }
+ }
+
+ private static class CancellingThread extends Thread {
+
+ private final QueryId queryId;
+
+ public CancellingThread(final QueryId queryId) {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void run() {
+ final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
+ try {
+ cancelAck.checkedGet();
+ } catch (final RpcException e) {
+ fail(e.getMessage()); // currently this failure does not fail the test
+ }
+ }
+ }
+
+ /**
+ * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
+ */
+ private static void assertCancelled(final String controls, final WaitUntilCompleteListener listener) {
+ ControlsInjectionUtil.setControls(drillClient, controls);
+
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ assertTrue(result.getFirst() == QueryState.CANCELED);
+ assertTrue(result.getSecond() == null);
+ }
+
+ @Test // Cancellation TC 1
+ public void cancelBeforeAnyResultsArrive() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+
+ @Override
+ public void queryIdArrived(final QueryId queryId) {
+ (new CancellingThread(queryId)).start();
+ }
+ };
+
+ final String controls = "{\"injections\":[{"
+ + "\"type\":\"pause\"," +
+ "\"siteClass\":\"" + Foreman.class.getName() + "\","
+ + "\"desc\":\"pause-run-plan\","
+ + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1"
+ + "}]}";
+
+ assertCancelled(controls, listener);
+ }
+
+ @Test // Cancellation TC 2
+ public void cancelInMiddleOfFetchingResults() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ if (! cancelRequested) {
+ assertTrue(queryId != null);
+ (new CancellingThread(queryId)).start();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ };
+
+ final String controls = "{\"injections\":[{"
+ + "\"type\":\"pause\"," +
+ "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
+ + "\"desc\":\"sending-data\","
+ + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1"
+ + "}]}";
+
+ assertCancelled(controls, listener);
+ }
+
+
+ @Test // Cancellation TC 3
+ public void cancelAfterAllResultsProduced() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ private int count = 0;
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ if (++count == drillbits.size()) {
+ assertTrue(queryId != null);
+ (new CancellingThread(queryId)).start();
+ }
+ result.release();
+ }
+ };
+
+ final String controls = "{\"injections\":[{"
+ + "\"type\":\"pause\"," +
+ "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
+ + "\"desc\":\"send-complete\","
+ + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1"
+ + "}]}";
+
+ assertCancelled(controls, listener);
+ }
+
+ @Test // Cancellation TC 4
+ public void cancelAfterEverythingIsCompleted() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+ private int count = 0;
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ if (++count == drillbits.size()) {
+ assertTrue(queryId != null);
+ (new CancellingThread(queryId)).start();
+ }
+ result.release();
+ }
+ };
+
+ final String controls = "{\"injections\":[{"
+ + "\"type\":\"pause\"," +
+ "\"siteClass\":\"" + Foreman.class.getName() + "\","
+ + "\"desc\":\"foreman-cleanup\","
+ + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1"
+ + "}]}";
+
+ assertCancelled(controls, listener);
+ }
+
+ @Test // Completion TC 1
+ public void successfullyCompletes() {
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
+ QueryTestUtil.testWithListener(
+ drillClient, QueryType.SQL, TEST_QUERY, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ assertTrue(result.getFirst() == QueryState.COMPLETED);
+ assertTrue(result.getSecond() == null);
+ }
+
+ /**
+ * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
+ */
+ private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+ final String exceptionDesc) {
+ setExceptions(controls);
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ assertTrue(result.getFirst() == QueryState.FAILED);
+ final Exception e = result.getSecond();
+ assertExceptionInjected(e, exceptionClass, exceptionDesc);
+ }
+
+ @Test // Completion TC 2
+ public void failsWhenParsing() {
+ final String exceptionDesc = "sql-parsing";
+ final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
+ final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ }
+
+ @Test // Completion TC 3
+ public void failsWhenSendingFragments() {
+ final String exceptionDesc = "send-fragments";
+ final Class<? extends Throwable> exceptionClass = ForemanException.class;
+ final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ }
+
+ @Test // Completion TC 4
+ public void failsDuringExecution() {
+ final String exceptionDesc = "fragment-execution";
+ final Class<? extends Throwable> exceptionClass = IOException.class;
+ final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 3a794a9..0e80f91 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
@@ -67,7 +68,7 @@ public class ParquetResultListener implements UserResultsListener {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
checkLastChunk();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
index cfe52c2..e1b03d5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -96,7 +97,7 @@ public class TestParquetPhysicalPlan extends ExecTest {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
new file mode 100644
index 0000000..346c6dd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.testing.ExecutionControls.Controls;
+
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Static methods for constructing exception and pause injections for testing purposes.
+ */
+public class ControlsInjectionUtil {
+ /**
+ * Constructor. Prevent instantiation of static utility class.
+ */
+ private ControlsInjectionUtil() {
+ }
+
+ private static final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
+ @Override
+ public void increment(final UserSession session) {
+ session.incrementQueryCount(this);
+ }
+ };
+
+ public static void setControls(final DrillClient drillClient, final String controls) {
+ validateControlsString(controls);
+ try {
+ final List<QueryDataBatch> results = drillClient.runQuery(
+ UserBitShared.QueryType.SQL, String.format("alter session set `%s` = '%s'",
+ ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls));
+ for (final QueryDataBatch data : results) {
+ data.release();
+ }
+ } catch (RpcException e) {
+ fail("Could not set controls options: " + e.toString());
+ }
+ }
+
+ public static void setControls(final UserSession session, final String controls) {
+ validateControlsString(controls);
+ final OptionValue opValue = OptionValue.createString(OptionValue.OptionType.SESSION,
+ ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
+
+ final OptionManager options = session.getOptions();
+ try {
+ options.getAdmin().validate(opValue);
+ options.setOption(opValue);
+ } catch (Exception e) {
+ fail("Could not set controls options: " + e.getMessage());
+ }
+ incrementer.increment(session); // to simulate that a query completed
+ }
+
+ private static void validateControlsString(final String controls) {
+ try {
+ ExecutionControls.controlsOptionMapper.readValue(controls, Controls.class);
+ } catch (Exception e) {
+ fail("Could not validate controls JSON: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Clears all the controls.
+ */
+ public static void clearControls(final DrillClient client) {
+ setControls(client, ExecutionControls.DEFAULT_CONTROLS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
deleted file mode 100644
index bf93dee..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Static methods for constructing exception injections for testing purposes.
- */
-public class ExceptionInjectionUtil {
- /**
- * Constructor. Prevent instantiation of static utility class.
- */
- private ExceptionInjectionUtil() {
- }
-
- /**
- * Add a set of injections to a drillbit.
- *
- * @param drillbit the drillbit
- * @param injections the JSON-specified injections
- */
- public static void setInjections(final Drillbit drillbit, final String injections) {
- final DrillbitContext drillbitContext = drillbit.getContext();
- final OptionValue stringValue = OptionValue.createString(
- OptionValue.OptionType.SYSTEM, ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, injections);
- final OptionManager optionManager = drillbitContext.getOptionManager();
- optionManager.setOption(stringValue);
- }
-
- /**
- * Add a set of injections to a drillbit.
- *
- * @param drillbit the drillbit
- * @param injectionOptions the injections, specified using the parsing POJOs
- */
- public static void setInjections(final Drillbit drillbit, final InjectionOptions injectionOptions) {
- final ObjectMapper objectMapper = new ObjectMapper();
- final StringWriter stringWriter = new StringWriter();
- try {
- objectMapper.writeValue(stringWriter, injectionOptions);
- } catch(IOException e) {
- throw new RuntimeException("Couldn't serialize injectionOptions to JSON", e);
- }
-
- setInjections(drillbit, stringWriter.toString());
- }
-
- /**
- * Clear all injections on a drillbit.
- *
- * @param drillbit the drillbit
- */
- public static void clearInjections(final Drillbit drillbit) {
- setInjections(drillbit, "");
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index d0c0279..2cba992 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -17,31 +17,41 @@
*/
package org.apache.drill.exec.testing;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
+import org.apache.drill.exec.server.RemoteServiceSet;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class TestExceptionInjection extends BaseTestQuery {
- private final static String NO_THROW_FAIL = "Didn't throw expected exception";
+ private static final String NO_THROW_FAIL = "Didn't throw expected exception";
+
+ private static final UserSession session = UserSession.Builder.newBuilder()
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
/**
- * Class whose methods we want to simulate exceptions at run-time for testing
- * purposes.
+ * Class whose methods we want to simulate runtime at run-time for testing
+ * purposes. The class must have access to QueryId, UserSession and DrillbitEndpoint.
+ * For instance, these are accessible from {@link org.apache.drill.exec.ops.QueryContext}.
*/
- public static class DummyClass {
- private final static ExceptionInjector injector = ExceptionInjector.getInjector(DummyClass.class);
- private final DrillbitContext drillbitContext;
+ private static class DummyClass {
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+ private final QueryContext context;
- public DummyClass(final DrillbitContext drillbitContext) {
- this.drillbitContext = drillbitContext;
+ public DummyClass(final QueryContext context) {
+ this.context = context;
}
/**
@@ -53,7 +63,7 @@ public class TestExceptionInjection extends BaseTestQuery {
// ... code ...
// simulated unchecked exception
- injector.injectUnchecked(drillbitContext, desc);
+ injector.injectUnchecked(context.getExecutionControls(), desc);
// ... code ...
}
@@ -69,7 +79,7 @@ public class TestExceptionInjection extends BaseTestQuery {
// ... code ...
// simulated IOException
- injector.injectChecked(drillbitContext, THROWS_IOEXCEPTION, IOException.class);
+ injector.injectChecked(context.getExecutionControls(), THROWS_IOEXCEPTION, IOException.class);
// ... code ...
}
@@ -77,36 +87,30 @@ public class TestExceptionInjection extends BaseTestQuery {
@SuppressWarnings("static-method")
@Test
- public void testNoInjection() throws Exception {
- test("select * from sys.drillbits");
- }
-
- private static void setInjections(final String jsonInjections) {
- for(Drillbit bit : bits) {
- ExceptionInjectionUtil.setInjections(bit, jsonInjections);
- }
+ public void noInjection() throws Exception {
+ test("select * from sys.memory");
}
@SuppressWarnings("static-method")
@Test
- public void testEmptyInjection() throws Exception {
- setInjections("{\"injections\":[]}");
- test("select * from sys.drillbits");
+ public void emptyInjection() throws Exception {
+ ControlsInjectionUtil.setControls(session, "{\"injections\":[]}");
+ test("select * from sys.memory");
}
/**
* Assert that DummyClass.descPassThroughMethod does indeed throw the expected exception.
*
- * @param dummyClass the instance of DummyClass
+ * @param dummyClass the instance of DummyClass
* @param exceptionClassName the expected exception
- * @param exceptionDesc the expected exception site description
+ * @param exceptionDesc the expected exception site description
*/
private static void assertPassthroughThrows(
- final DummyClass dummyClass, final String exceptionClassName, final String exceptionDesc) {
+ final DummyClass dummyClass, final String exceptionClassName, final String exceptionDesc) {
try {
dummyClass.descPassthroughMethod(exceptionDesc);
fail(NO_THROW_FAIL);
- } catch(Exception e) {
+ } catch (Exception e) {
assertEquals(exceptionClassName, e.getClass().getName());
assertEquals(exceptionDesc, e.getMessage());
}
@@ -114,79 +118,192 @@ public class TestExceptionInjection extends BaseTestQuery {
@SuppressWarnings("static-method")
@Test
- public void testUncheckedStringInjection() {
- // set injections via a string
+ public void uncheckedInjection() {
+ // set exceptions via a string
final String exceptionDesc = "<<injected from descPassthroughMethod()>>";
final String exceptionClassName = "java.lang.RuntimeException";
final String jsonString = "{\"injections\":[{"
- + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
- + "\"desc\":\"" + exceptionDesc + "\","
- + "\"nSkip\":0,"
- + "\"nFire\":1,"
- + "\"exceptionClass\":\"" + exceptionClassName + "\""
- + "}]}";
- setInjections(jsonString);
+ + "\"type\":\"exception\"," +
+ "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+ + "\"desc\":\"" + exceptionDesc + "\","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1,"
+ + "\"exceptionClass\":\"" + exceptionClassName + "\""
+ + "}]}";
+ ControlsInjectionUtil.setControls(session, jsonString);
+
+ final QueryContext context = new QueryContext(session, bits[0].getContext());
// test that the exception gets thrown
- final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+ final DummyClass dummyClass = new DummyClass(context);
assertPassthroughThrows(dummyClass, exceptionClassName, exceptionDesc);
+ try {
+ context.close();
+ } catch (Exception e) {
+ fail();
+ }
}
- private static InjectionOptions buildDefaultJson() {
- final InjectionOption injectionOption = new InjectionOption();
- injectionOption.siteClass = "org.apache.drill.exec.testing.TestExceptionInjection$DummyClass";
- injectionOption.desc = DummyClass.THROWS_IOEXCEPTION;
- injectionOption.nSkip = 0;
- injectionOption.nFire = 1;
- injectionOption.exceptionClass = "java.io.IOException";
- final InjectionOptions injectionOptions = new InjectionOptions();
- injectionOptions.injections = new InjectionOption[1];
- injectionOptions.injections[0] = injectionOption;
- return injectionOptions;
+ private static String createException(final String desc, final int nSkip, final int nFire,
+ final String exceptionClass) {
+ return "{\"injections\":[{"
+ + "\"type\":\"exception\","
+ + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+ + "\"desc\":\"" + desc + "\","
+ + "\"nSkip\": " + nSkip + ","
+ + "\"nFire\": " + nFire + ","
+ + "\"exceptionClass\":\"" + exceptionClass + "\""
+ + "}]}";
+ }
+
+ private static String createExceptionOnBit(final DrillbitEndpoint endpoint, final String desc, final int nSkip,
+ final int nFire, final String exceptionClass) {
+ return "{\"injections\":[{"
+ + "\"address\":\"" + endpoint.getAddress() + "\","
+ + "\"port\":\"" + endpoint.getUserPort() + "\","
+ + "\"type\":\"exception\","
+ + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+ + "\"desc\":\"" + desc + "\","
+ + "\"nSkip\": " + nSkip + ","
+ + "\"nFire\": " + nFire + ","
+ + "\"exceptionClass\":\"" + exceptionClass + "\""
+ + "}]}";
}
@SuppressWarnings("static-method")
@Test
- public void testCheckedJsonInjection() {
+ public void checkedInjection() {
// set the injection via the parsing POJOs
- final InjectionOptions injectionOptions = buildDefaultJson();
- ExceptionInjectionUtil.setInjections(bits[0], injectionOptions);
+ final String controls = createException(DummyClass.THROWS_IOEXCEPTION, 0, 1, IOException.class.getName());
+ ControlsInjectionUtil.setControls(session, controls);
+
+ final QueryContext context = new QueryContext(session, bits[0].getContext());
// test that the expected exception (checked) gets thrown
- final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+ final DummyClass dummyClass = new DummyClass(context);
try {
dummyClass.throwsIOException();
fail(NO_THROW_FAIL);
- } catch(IOException e) {
+ } catch (IOException e) {
assertEquals(DummyClass.THROWS_IOEXCEPTION, e.getMessage());
}
+ try {
+ context.close();
+ } catch (Exception e) {
+ fail();
+ }
}
@SuppressWarnings("static-method")
@Test
- public void testSkipAndLimit() {
+ public void skipAndLimit() {
final String passthroughDesc = "<<injected from descPassthrough>>";
- final InjectionOptions injectionOptions = buildDefaultJson();
- final InjectionOption injectionOption = injectionOptions.injections[0];
- injectionOption.desc = passthroughDesc;
- injectionOption.nSkip = 7;
- injectionOption.nFire = 3;
- injectionOption.exceptionClass = RuntimeException.class.getName();
- ExceptionInjectionUtil.setInjections(bits[0], injectionOptions);
+ final int nSkip = 7;
+ final int nFire = 3;
+ final String exceptionClass = RuntimeException.class.getName();
+ final String controls = createException(passthroughDesc, nSkip, nFire, exceptionClass);
+ ControlsInjectionUtil.setControls(session, controls);
+
+ final QueryContext context = new QueryContext(session, bits[0].getContext());
- final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+ final DummyClass dummyClass = new DummyClass(context);
// these shouldn't throw
- for(int i = 0; i < injectionOption.nSkip; ++i) {
+ for (int i = 0; i < nSkip; ++i) {
dummyClass.descPassthroughMethod(passthroughDesc);
}
// these should throw
- for(int i = 0; i < injectionOption.nFire; ++i) {
- assertPassthroughThrows(dummyClass, injectionOption.exceptionClass, passthroughDesc);
+ for (int i = 0; i < nFire; ++i) {
+ assertPassthroughThrows(dummyClass, exceptionClass, passthroughDesc);
}
// this shouldn't throw
dummyClass.descPassthroughMethod(passthroughDesc);
+ try {
+ context.close();
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @SuppressWarnings("static-method")
+ @Test
+ public void injectionOnSpecificBit() {
+ final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+ final ZookeeperHelper zkHelper = new ZookeeperHelper();
+ zkHelper.startZookeeper(1);
+
+ // Creating two drillbits
+ final Drillbit drillbit1, drillbit2;
+ final DrillConfig drillConfig = zkHelper.getConfig();
+ try {
+ drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+ drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+ } catch (DrillbitStartupException e) {
+ throw new RuntimeException("Failed to start drillbits.", e);
+ }
+
+ final DrillbitContext drillbitContext1 = drillbit1.getContext();
+ final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+ final UserSession session = UserSession.Builder.newBuilder()
+ .withOptionManager(drillbitContext1.getOptionManager())
+ .build();
+
+ final String passthroughDesc = "<<injected from descPassthrough>>";
+ final int nSkip = 7;
+ final int nFire = 3;
+ final String exceptionClass = RuntimeException.class.getName();
+ // only drillbit1's (address, port)
+ final String controls = createExceptionOnBit(drillbitContext1.getEndpoint(), passthroughDesc, nSkip, nFire,
+ exceptionClass);
+
+ ControlsInjectionUtil.setControls(session, controls);
+
+ {
+ final QueryContext queryContext1 = new QueryContext(session, drillbitContext1);
+ final DummyClass class1 = new DummyClass(queryContext1);
+
+ // these shouldn't throw
+ for (int i = 0; i < nSkip; ++i) {
+ class1.descPassthroughMethod(passthroughDesc);
+ }
+
+ // these should throw
+ for (int i = 0; i < nFire; ++i) {
+ assertPassthroughThrows(class1, exceptionClass, passthroughDesc);
+ }
+
+ // this shouldn't throw
+ class1.descPassthroughMethod(passthroughDesc);
+ try {
+ queryContext1.close();
+ } catch (Exception e) {
+ fail();
+ }
+ }
+ {
+ final QueryContext queryContext2 = new QueryContext(session, drillbitContext2);
+ final DummyClass class2 = new DummyClass(queryContext2);
+
+ // these shouldn't throw
+ for (int i = 0; i < nSkip; ++i) {
+ class2.descPassthroughMethod(passthroughDesc);
+ }
+
+ // these shouldn't throw
+ for (int i = 0; i < nFire; ++i) {
+ class2.descPassthroughMethod(passthroughDesc);
+ }
+
+ // this shouldn't throw
+ class2.descPassthroughMethod(passthroughDesc);
+ try {
+ queryContext2.close();
+ } catch (Exception e) {
+ fail();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
new file mode 100644
index 0000000..1c219f0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestPauseInjection extends BaseTestQuery {
+
+ private static final UserSession session = UserSession.Builder.newBuilder()
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
+
+ /**
+ * Class whose methods we want to simulate pauses at run-time for testing
+ * purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
+ * {@link org.apache.drill.exec.ops.FragmentContext}.
+ */
+ private static class DummyClass {
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(DummyClass.class);
+ private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+
+ private final QueryContext context;
+
+ public DummyClass(final QueryContext context) {
+ this.context = context;
+ }
+
+ public static final String PAUSES = "<<pauses>>";
+
+ /**
+ * Method that pauses.
+ *
+ * @return how long the method paused in milliseconds
+ */
+ public long pauses() {
+ // ... code ...
+
+ final long startTime = System.currentTimeMillis();
+ // simulated pause
+ injector.injectPause(context.getExecutionControls(), PAUSES, logger);
+ final long endTime = System.currentTimeMillis();
+
+ // ... code ...
+ return (endTime - startTime);
+ }
+ }
+
+ @Test
+ public void pauseInjected() {
+ final long pauseMillis = 1000L;
+ final String jsonString = "{\"injections\":[{"
+ + "\"type\":\"pause\"," +
+ "\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+ + "\"desc\":\"" + DummyClass.PAUSES + "\","
+ + "\"millis\":" + pauseMillis + ","
+ + "\"nSkip\":0,"
+ + "\"nFire\":1"
+ + "}]}";
+
+ ControlsInjectionUtil.setControls(session, jsonString);
+
+ final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+
+ // test that the pause happens
+ final DummyClass dummyClass = new DummyClass(queryContext);
+ final long time = dummyClass.pauses();
+ assertTrue((time >= pauseMillis));
+ try {
+ queryContext.close();
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index d2302fb..484a5e5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -32,6 +32,7 @@ import net.hydromatic.avatica.AvaticaStatement;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
@@ -191,7 +192,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
releaseIfFirst();
completed = true;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/protocol/readme.txt
----------------------------------------------------------------------
diff --git a/protocol/readme.txt b/protocol/readme.txt
index bd516d3..6f502c4 100644
--- a/protocol/readme.txt
+++ b/protocol/readme.txt
@@ -8,6 +8,6 @@ To regenerate the sources after making changes to .proto files
in your PATH (you may need to download and build it first). You can
download it from http://code.google.com/p/protobuf/downloads/list.
-2. Run "mvn process-sources -P proto-compile".
+2. In protocol dir, run "mvn process-sources -P proto-compile" or "mvn clean install -P proto-compile".
3. Check in the new/updated files.
\ No newline at end of file
[2/2] drill git commit: DRILL-2383: Support to inject exceptions and
pauses in various components of Drill + Controls are fired only if assertions
are enabled + Controls can be introduced in any class that has access to
FragmentContext/QueryContext + Con
Posted by pa...@apache.org.
DRILL-2383: Support to inject exceptions and pauses in various components of Drill
+ Controls are fired only if assertions are enabled
+ Controls can be introduced in any class that has access to FragmentContext/QueryContext
+ Controls can be fired by altering the DRILLBIT_CONTROL_INJECTIONS session option
+ Renames: SimulatedExceptions => ExecutionControls, ExceptionInjector => ExecutionControlsInjector
+ Added injection sites in Foreman, DrillSqlWorker, FragmentExecutor
+ Unit tests in TestDrillbitResilience, TestExceptionInjection and TestPauseInjection
Other commits included:
+ DRILL-2437: Moved ExecutionControls from DrillbitContext to FragmentContext/QueryContext
+ DRILL-2382: Added address and port to Injection to specify drillbit
+ DRILL-2384: Added QueryState to SingleRowListener and assert that state is COMPLETED while testing
Other edits:
+ Support for short lived session options in SessionOptionManager (using TTL in OptionValidator)
+ Introduced query count in UserSession
+ Added QueryState to queryCompleted() in UserResultsListener to check if COMPLETED/CANCELED
+ Added JSONStringValidator to TypeValidators
+ Log query id as string in DrillClient, WorkEventBus, QueryResultHandler
+ Use try..catch block only around else clause for OptionList in FragmentContext
+ Fixed drillbitContext spelling error in QueryContext
+ Fixed state transition when cancel() before run() in FragmentExecutor
+ Do not call setLocalOption twice in FallbackOptionManager
+ Show explicitly that submitWork() returns queryId in UserServer
+ Updated protocol/readme.txt to include an alternative way to generate sources
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be8d9539
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be8d9539
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be8d9539
Branch: refs/heads/master
Commit: be8d953935461ee6567b0c4d96c503e8b04469d2
Parents: 21dfe7a
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri Apr 17 09:46:11 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Mon Apr 20 19:58:53 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 7 +-
.../apache/drill/exec/client/DrillClient.java | 7 +-
.../exec/client/PrintingResultsListener.java | 3 +-
.../apache/drill/exec/ops/FragmentContext.java | 26 +-
.../org/apache/drill/exec/ops/QueryContext.java | 14 +-
.../drill/exec/physical/impl/ScreenCreator.java | 4 +
.../drill/exec/planner/sql/DrillSqlWorker.java | 6 +-
.../drill/exec/rpc/control/WorkEventBus.java | 20 +-
.../drill/exec/rpc/user/QueryResultHandler.java | 35 +-
.../exec/rpc/user/UserResultsListener.java | 4 +-
.../apache/drill/exec/rpc/user/UserServer.java | 10 +-
.../apache/drill/exec/rpc/user/UserSession.java | 26 +-
.../drill/exec/server/DrillbitContext.java | 5 -
.../server/options/FallbackOptionManager.java | 2 -
.../exec/server/options/OptionManager.java | 1 +
.../exec/server/options/OptionValidator.java | 26 ++
.../server/options/SessionOptionManager.java | 75 ++-
.../server/options/SystemOptionManager.java | 7 +-
.../exec/server/options/TypeValidators.java | 31 ++
.../drill/exec/server/rest/QueryWrapper.java | 3 +-
.../drill/exec/testing/ExceptionInjection.java | 84 ++--
.../drill/exec/testing/ExceptionInjector.java | 112 -----
.../drill/exec/testing/ExecutionControls.java | 193 ++++++++
.../exec/testing/ExecutionControlsInjector.java | 129 ++++++
.../apache/drill/exec/testing/Injection.java | 84 ++++
.../InjectionConfigurationException.java | 35 ++
.../drill/exec/testing/InjectionSite.java | 40 +-
.../exec/testing/NoOpControlsInjector.java | 48 ++
.../drill/exec/testing/PauseInjection.java | 63 +++
.../drill/exec/testing/SimulatedExceptions.java | 164 -------
.../apache/drill/exec/work/foreman/Foreman.java | 14 +-
.../exec/work/fragment/FragmentExecutor.java | 55 ++-
.../apache/drill/exec/work/user/UserWorker.java | 9 +
.../java/org/apache/drill/BaseTestQuery.java | 3 +-
.../java/org/apache/drill/PlanningBase.java | 8 +-
.../org/apache/drill/SingleRowListener.java | 3 +-
.../exec/compile/TestClassTransformation.java | 6 +-
.../exec/server/TestDrillbitResilience.java | 461 ++++++++++++++-----
.../store/parquet/ParquetResultListener.java | 3 +-
.../store/parquet/TestParquetPhysicalPlan.java | 3 +-
.../exec/testing/ControlsInjectionUtil.java | 95 ++++
.../exec/testing/ExceptionInjectionUtil.java | 82 ----
.../exec/testing/TestExceptionInjection.java | 257 ++++++++---
.../drill/exec/testing/TestPauseInjection.java | 96 ++++
.../drill/jdbc/impl/DrillResultSetImpl.java | 3 +-
protocol/readme.txt | 2 +-
46 files changed, 1682 insertions(+), 682 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7d89ac9..f7648b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
+import org.apache.drill.exec.testing.ExecutionControls;
public interface ExecConstants {
public static final String ZK_RETRY_TIMES = "drill.exec.zk.retry.count";
@@ -216,7 +217,7 @@ public interface ExecConstants {
public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
- public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
- public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
- new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
+ public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
+ public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
+ new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 336a149..0d29f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.Property;
@@ -326,7 +327,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
future.set(results);
}
@@ -352,7 +353,9 @@ public class DrillClient implements Closeable, ConnectionThrottle {
@Override
public void queryIdArrived(QueryId queryId) {
- logger.debug( "Query ID arrived: {}", queryId );
+ if (logger.isDebugEnabled()) {
+ logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 2bf35b1..64e7266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -60,7 +61,7 @@ public class PrintingResultsListener implements UserResultsListener {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
allocator.close();
latch.countDown();
System.out.println("Total rows returned: " + count.get());
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index c46613d..9400355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +64,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
- private final UserClientConnection connection;
+ private final UserClientConnection connection; // is null if attached to non-root fragment
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
@@ -73,6 +74,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private final OptionManager fragmentOptions;
private final BufferManager bufferManager;
private ExecutorState executorState;
+ private final ExecutionControls executionControls;
private final SendingAccountor sendingAccountor = new SendingAccountor();
private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -98,17 +100,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
- try {
- final OptionList list;
- if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
- list = new OptionList();
- } else {
+ final OptionList list;
+ if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
+ list = new OptionList();
+ } else {
+ try {
list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+ } catch (final Exception e) {
+ throw new ExecutionSetupException("Failure while reading plan options.", e);
}
- fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
- } catch (final Exception e) {
- throw new ExecutionSetupException("Failure while reading plan options.", e);
}
+ fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+
+ executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
@@ -288,6 +292,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
allocator.setFragmentLimit(limit);
}
+ public ExecutionControls getExecutionControls() {
+ return executionControls;
+ }
+
@Override
public void close() {
waitForSendComplete();
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2fa0b18..2dcac25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
import org.apache.drill.exec.store.PartitionExplorerImpl;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControls;
// TODO except for a couple of tests, this is only created by Foreman
// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -52,6 +53,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
private final OptionManager queryOptions;
private final PlannerSettings plannerSettings;
private final DrillOperatorTable table;
+ private final ExecutionControls executionControls;
private final BufferAllocator allocator;
private final BufferManager bufferManager;
@@ -65,10 +67,11 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
*/
private boolean closed = false;
- public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
- this.drillbitContext = drllbitContext;
+ public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
+ this.drillbitContext = drillbitContext;
this.session = session;
queryOptions = new QueryOptionManager(session.getOptions());
+ executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
table = new DrillOperatorTable(getFunctionRegistry());
@@ -78,7 +81,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
try {
- allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
+ allocator = drillbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
} catch (OutOfMemoryException e) {
throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
@@ -87,7 +90,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
bufferManager = new BufferManager(this.allocator, null);
}
-
public PlannerSettings getPlannerSettings() {
return plannerSettings;
}
@@ -120,6 +122,10 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
return queryOptions;
}
+ public ExecutionControls getExecutionControls() {
+ return executionControls;
+ }
+
public DrillbitEndpoint getCurrentEndpoint() {
return drillbitContext.getEndpoint();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 6b3caf4..2069d35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -34,9 +34,11 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import com.google.common.base.Preconditions;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class ScreenCreator implements RootCreator<Screen>{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ScreenCreator.class);
@@ -107,6 +109,7 @@ public class ScreenCreator implements RootCreator<Screen>{
materializer = new VectorRecordMaterializer(context, incoming);
//$FALL-THROUGH$
case OK:
+ injector.injectPause(context.getExecutionControls(), "sending-data", logger);
QueryWritableBatch batch = materializer.convertNext();
updateStats(batch);
stats.startWait();
@@ -139,6 +142,7 @@ public class ScreenCreator implements RootCreator<Screen>{
if (!oContext.isClosed()) {
internalStop();
}
+ injector.injectPause(context.getExecutionControls(), "send-complete", logger);
}
RecordBatch getIncoming() {
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 9ca64d8..097b7bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -45,7 +45,9 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.rules.ReduceExpressionsRule;
@@ -60,7 +62,8 @@ import org.eigenbase.sql.parser.SqlParseException;
import org.eigenbase.sql.parser.SqlParser;
public class DrillSqlWorker {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DrillSqlWorker.class);
private final Planner planner;
private final HepPlanner hepPlanner;
@@ -119,6 +122,7 @@ public class DrillSqlWorker {
public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException {
SqlNode sqlNode;
try {
+ injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
sqlNode = planner.parse(sql);
} catch (SqlParseException e) {
throw new QueryInputException("Failure parsing SQL. " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index a5a5441..d90096a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -45,13 +45,17 @@ public class WorkEventBus {
.build();
public void removeFragmentStatusListener(final QueryId queryId) {
- logger.debug("Removing fragment status listener for queryId {}.", queryId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+ }
listeners.remove(queryId);
}
public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener)
throws ForemanSetupException {
- logger.debug("Adding fragment status listener for queryId {}.", queryId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+ }
final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
if (old != null) {
throw new ForemanSetupException (
@@ -69,7 +73,9 @@ public class WorkEventBus {
}
public void addFragmentManager(final FragmentManager fragmentManager) {
- logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+ }
final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
if (old != null) {
throw new IllegalStateException(
@@ -84,7 +90,9 @@ public class WorkEventBus {
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
// check if this was a recently canceled fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ }
return null;
}
@@ -98,7 +106,9 @@ public class WorkEventBus {
}
public void removeFragmentManager(final FragmentHandle handle) {
- logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+ }
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 3c807d5..3beae89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
@@ -114,7 +115,7 @@ public class QueryResultHandler {
// A successful completion/canceled case--pass on via resultArrived
try {
- resultsListener.queryCompleted();
+ resultsListener.queryCompleted(queryState);
} catch ( Exception e ) {
resultsListener.submissionFailed(UserException.systemError(e).build());
}
@@ -198,8 +199,8 @@ public class QueryResultHandler {
private static class BufferingResultsListener implements UserResultsListener {
private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
- private volatile boolean finished = false;
private volatile UserException ex;
+ private volatile QueryState queryState;
private volatile UserResultsListener output;
private volatile ConnectionThrottle throttle;
@@ -212,20 +213,22 @@ public class QueryResultHandler {
if (ex != null) {
l.submissionFailed(ex);
return true;
- } else if (finished) {
- l.queryCompleted();
+ } else if (queryState != null) {
+ l.queryCompleted(queryState);
+ return true;
}
- return finished;
+ return false;
}
}
@Override
- public void queryCompleted() {
- finished = true;
+ public void queryCompleted(QueryState state) {
+ assert queryState == null;
+ this.queryState = state;
synchronized (this) {
if (output != null) {
- output.queryCompleted();
+ output.queryCompleted(state);
}
}
}
@@ -245,7 +248,11 @@ public class QueryResultHandler {
@Override
public void submissionFailed(UserException ex) {
- finished = true;
+ assert queryState == null;
+ // there is one case when submissionFailed() is called even though the query didn't fail on the server side
+ // it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
+ // submissionFailed() by QueryResultHandler.dataArrived()
+ queryState = QueryState.FAILED;
synchronized (this) {
if (output == null) {
this.ex = ex;
@@ -255,10 +262,6 @@ public class QueryResultHandler {
}
}
- public boolean isFinished() {
- return finished;
- }
-
@Override
public void queryIdArrived(QueryId queryId) {
}
@@ -281,8 +284,10 @@ public class QueryResultHandler {
@Override
public void success(QueryId queryId, ByteBuf buf) {
resultsListener.queryIdArrived(queryId);
- logger.debug("Received QueryId {} successfully. Adding results listener {}.",
- queryId, resultsListener);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received QueryId {} successfully. Adding results listener {}.",
+ QueryIdHelper.getQueryId(queryId), resultsListener);
+ }
UserResultsListener oldListener =
queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index f928476..e422a3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.user;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
public interface UserResultsListener {
@@ -38,8 +39,9 @@ public interface UserResultsListener {
/**
* The query has completed (successsful completion or cancellation). The listener will not receive any other
* data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
+ * @param state
*/
- void queryCompleted();
+ void queryCompleted(QueryState state);
/**
* A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 877bc08..9e929de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -97,16 +97,18 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
case RpcType.RUN_QUERY_VALUE:
logger.debug("Received query to run. Returning query handle.");
try {
- RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
- return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
+ final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ final QueryId queryId = worker.submitWork(connection, query);
+ return new Response(RpcType.QUERY_HANDLE, queryId);
} catch (InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding RunQuery body.", e);
}
case RpcType.CANCEL_QUERY_VALUE:
try {
- QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
- return new Response(RpcType.ACK, worker.cancelQuery(queryId));
+ final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ final Ack ack = worker.cancelQuery(queryId);
+ return new Response(RpcType.ACK, ack);
} catch (InvalidProtocolBufferException e) {
throw new RpcException("Failure while decoding QueryId body.", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 19d77b0..e631792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.rpc.user;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import net.hydromatic.optiq.SchemaPlus;
@@ -42,6 +43,16 @@ public class UserSession {
private UserCredentials credentials;
private Map<String, String> properties;
private OptionManager sessionOptions;
+ private final AtomicInteger queryCount;
+
+ /**
+ * Implementations of this interface are allowed to increment queryCount.
+ * {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface.
+ * No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil).
+ */
+ public static interface QueryCountIncrementer {
+ public void increment(final UserSession session);
+ }
public static class Builder {
UserSession userSession;
@@ -56,7 +67,7 @@ public class UserSession {
}
public Builder withOptionManager(OptionManager systemOptions) {
- userSession.sessionOptions = new SessionOptionManager(systemOptions);
+ userSession.sessionOptions = new SessionOptionManager(systemOptions, userSession);
return this;
}
@@ -87,7 +98,9 @@ public class UserSession {
}
}
- private UserSession() { }
+ private UserSession() {
+ queryCount = new AtomicInteger(0);
+ }
public boolean isSupportComplexTypes() {
return supportComplexTypes;
@@ -105,6 +118,15 @@ public class UserSession {
return credentials;
}
+ public void incrementQueryCount(final QueryCountIncrementer incrementer) {
+ assert incrementer != null;
+ queryCount.incrementAndGet();
+ }
+
+ public int getQueryCount() {
+ return queryCount.get();
+ }
+
/**
* Update the schema path for the session.
* @param fullPath The desired path to set to.
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index dbf3c74..6fdbfca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.testing.SimulatedExceptions;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
@@ -59,7 +58,6 @@ public class DrillbitContext {
private final PStoreProvider provider;
private final CodeCompiler compiler;
private final ExecutorService executor;
- private final SimulatedExceptions simulatedExceptions = new SimulatedExceptions();
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord,
Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider,
@@ -164,7 +162,4 @@ public class DrillbitContext {
return executor;
}
- public SimulatedExceptions getSimulatedExceptions() {
- return simulatedExceptions;
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
index 4e90616..682bfea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -67,8 +67,6 @@ public abstract class FallbackOptionManager extends BaseOptionManager {
private void setValidatedOption(OptionValue value) {
if (!setLocalOption(value)) {
fallback.setOption(value);
- }else{
- setLocalOption(value);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
index 0b8811a..0fed1fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -39,6 +39,7 @@ public interface OptionManager extends Iterable<OptionValue> {
public interface OptionAdmin {
public void registerOptionType(OptionValidator validator);
+ public OptionValidator getValidator(String name);
public void validate(OptionValue v) throws SetOptionException;
public OptionValue validate(String name, SqlLiteral value, OptionType optionType) throws SetOptionException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
index 43071e7..90ce3a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -53,6 +53,32 @@ public abstract class OptionValidator {
return optionName;
}
+ /**
+ * This function returns true if and only if the validator is meant for a short-lived option.
+ *
+ * NOTE: By default, options are not short-lived. So, if a derived class is meant for a short-lived option,
+ * that class must do two things:
+ * (1) override this method to return true, and
+ * (2) return the number of queries for which the option is valid through {@link #getTtl}.
+ * E.g. {@link org.apache.drill.exec.testing.ExecutionControls.ControlsOptionValidator}
+ * @return if this validator is for a short-lived option
+ */
+ public boolean isShortLived() {
+ return false;
+ }
+
+ /**
+ * If an option is short-lived, this returns the number of queries for which the option is valid.
+ * Please read the note at {@link #isShortLived}
+ * @return number of queries for which the option should be valid
+ */
+ public int getTtl() {
+ if (!isShortLived()) {
+ throw new UnsupportedOperationException("This option is not short-lived.");
+ }
+ return 0;
+ }
+
public String getDefaultString() {
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index c3de190..340358f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,13 +17,86 @@
*/
package org.apache.drill.exec.server.options;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
public class SessionOptionManager extends InMemoryOptionManager {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
- public SessionOptionManager(OptionManager systemOptions) {
+ private final UserSession session;
+
+ /**
+ * Map of short lived options. Key: option name, Value: [ start, end )
+ */
+ private final ConcurrentHashMap<String, ImmutablePair<Integer, Integer>> shortLivedOptions = new ConcurrentHashMap<>();
+
+ public SessionOptionManager(final OptionManager systemOptions, final UserSession session) {
super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
+ this.session = session;
+ }
+
+ @Override
+ boolean setLocalOption(final OptionValue value) {
+ final boolean set = super.setLocalOption(value);
+ final String name = value.name;
+ final OptionValidator validator = fallback.getAdmin().getValidator(name);
+ final boolean shortLived = validator.isShortLived();
+ if (set && shortLived) {
+ final int start = session.getQueryCount() + 1; // start from the next query
+ final int ttl = validator.getTtl();
+ final int end = start + ttl;
+ shortLivedOptions.put(name, new ImmutablePair<>(start, end));
+ }
+ return set;
+ }
+
+ @Override
+ OptionValue getLocalOption(final String name) {
+ final OptionValue value = options.get(name);
+ if (shortLivedOptions.containsKey(name)) {
+ if (withinRange(value)) {
+ return value;
+ }
+ final int queryNumber = session.getQueryCount();
+ final int start = shortLivedOptions.get(name).getLeft();
+ // option is not in effect if queryNumber < start
+ if (queryNumber < start) {
+ return fallback.getAdmin().getValidator(name).getDefault();
+ // reset if queryNumber <= end
+ } else {
+ options.remove(name);
+ shortLivedOptions.remove(name);
+ return null; // fallback takes effect
+ }
+ }
+ return value;
+ }
+
+ private boolean withinRange(final OptionValue value) {
+ final int queryNumber = session.getQueryCount();
+ final ImmutablePair<Integer, Integer> pair = shortLivedOptions.get(value.name);
+ final int start = pair.getLeft();
+ final int end = pair.getRight();
+ return start <= queryNumber && queryNumber < end;
+ }
+
+ private final Predicate<OptionValue> isLive = new Predicate<OptionValue>() {
+ @Override
+ public boolean apply(final OptionValue value) {
+ final String name = value.name;
+ return !shortLivedOptions.containsKey(name) || withinRange(value);
+ }
+ };
+
+ @Override
+ Iterable<OptionValue> optionIterable() {
+ final Collection<OptionValue> liveOptions = Collections2.filter(options.values(), isLive);
+ return liveOptions;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4471d4f..a745479 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -101,7 +101,7 @@ public class SystemOptionManager extends BaseOptionManager {
QueryClassLoader.JAVA_COMPILER_DEBUG,
ExecConstants.ENABLE_VERBOSE_ERRORS,
ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
- ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR,
+ ExecConstants.DRILLBIT_CONTROLS_VALIDATOR,
ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
};
@@ -216,6 +216,11 @@ public class SystemOptionManager extends BaseOptionManager {
}
@Override
+ public OptionValidator getValidator(final String name) {
+ return knownOptions.get(name);
+ }
+
+ @Override
public void validate(final OptionValue v) throws SetOptionException {
final OptionValidator validator = knownOptions.get(v.name);
if (validator == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index b9721cc..e7b1eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -17,10 +17,12 @@
*/
package org.apache.drill.exec.server.options;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashSet;
import java.util.Set;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.common.exceptions.ExpressionParsingException;
import org.apache.drill.exec.server.options.OptionValue.Kind;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
@@ -155,6 +157,35 @@ public class TypeValidators {
}
}
+ /**
+ * Validator for POJO passed in as JSON string
+ */
+ public static class JsonStringValidator extends StringValidator {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private final Class<?> clazz;
+
+ public JsonStringValidator(final String name, final Class<?> clazz, final String def) {
+ super(name, def);
+ this.clazz = clazz;
+ validateJson(def, clazz);
+ }
+
+ @Override
+ public void validate(final OptionValue v) throws ExpressionParsingException {
+ super.validate(v);
+ validateJson(v.string_val, clazz);
+ }
+
+ private static void validateJson(final String jsonString, final Class<?> clazz) {
+ try {
+ mapper.readValue(jsonString, clazz);
+ } catch (IOException e) {
+ throw new ExpressionParsingException("Invalid JSON string (" + jsonString + ") for class " + clazz.getName(), e);
+ }
+ }
+ }
+
public static abstract class TypeValidator extends OptionValidator {
private final Kind kind;
private final OptionValue defaultValue;
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 62f5bdb..aa43aa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -136,7 +137,7 @@ public class QueryWrapper {
}
@Override
- public void queryCompleted() {
+ public void queryCompleted(QueryState state) {
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
index 68cbf08..61f0d67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
@@ -17,38 +17,48 @@
*/
package org.apache.drill.exec.testing;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Injection for a single exception. Specifies how many times to inject it, and how many times to skip
* injecting it before the first injection. This class is used internally for tracking injected
* exceptions; injected exceptions are specified via the
- * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_EXCEPTION_INJECTIONS} system option.
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
*/
-public class ExceptionInjection {
- private final String desc; // description of the injection site
-
- private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
- private final AtomicInteger nThrow; // the number of times to do the injection, after any skips; starts > 0
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class ExceptionInjection extends Injection {
private final Class<? extends Throwable> exceptionClass;
- /**
- * Constructor.
- *
- * @param desc description of the injection site; useful for multiple injections in a single class
- * @param nSkip non-negative number of times to skip injecting the exception
- * @param nFire positive number of times to inject the exception
- * @param exceptionClass
- */
- public ExceptionInjection(final String desc, final int nSkip, final int nFire,
- final Class<? extends Throwable> exceptionClass) {
- this.desc = desc;
- this.nSkip = new AtomicInteger(nSkip);
- this.nThrow = new AtomicInteger(nFire);
- this.exceptionClass = exceptionClass;
+ @JsonCreator // ensures instances are created only through JSON
+ private ExceptionInjection(@JsonProperty("address") final String address,
+ @JsonProperty("port") final int port,
+ @JsonProperty("siteClass") final String siteClass,
+ @JsonProperty("desc") final String desc,
+ @JsonProperty("nSkip") final int nSkip,
+ @JsonProperty("nFire") final int nFire,
+ @JsonProperty("exceptionClass") String classString) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, nSkip, nFire);
+ final Class<?> clazz;
+ try {
+ clazz = Class.forName(classString);
+ } catch (ClassNotFoundException e) {
+ throw new InjectionConfigurationException("Injected exceptionClass not found.", e);
+ }
+
+ if (!Throwable.class.isAssignableFrom(clazz)) {
+ throw new InjectionConfigurationException("Injected exceptionClass is not a Throwable.");
+ }
+
+ @SuppressWarnings("unchecked")
+ final Class<? extends Throwable> exceptionClazz = (Class<? extends Throwable>) clazz;
+ this.exceptionClass = exceptionClazz;
}
/**
@@ -57,29 +67,25 @@ public class ExceptionInjection {
* @return the exception to throw, or null if it isn't time to throw it
*/
private Throwable constructException() {
- final int remainingSkips = nSkip.decrementAndGet();
- if (remainingSkips >= 0) {
- return null;
- }
-
- final int remainingFirings = nThrow.decrementAndGet();
- if (remainingFirings < 0) {
+ if (! injectNow()) {
return null;
}
// if we get here, we should throw the specified exception
- Constructor<?> constructor;
+ final Constructor<?> constructor;
try {
constructor = exceptionClass.getConstructor(String.class);
- } catch(NoSuchMethodException e) {
- throw new RuntimeException("No constructor found that takes a single String argument");
+ } catch (NoSuchMethodException e) {
+ // this should not throw; validated already.
+ throw new RuntimeException("No constructor found that takes a single String argument.");
}
- Throwable throwable;
+ final Throwable throwable;
try {
- throwable = (Throwable) constructor.newInstance(desc);
- } catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
- throw new IllegalStateException("Couldn't construct exception instance", e);
+ throwable = (Throwable) constructor.newInstance(getDesc());
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ // this should not throw; validated already.
+ throw new IllegalStateException("Couldn't construct exception instance.", e);
}
return throwable;
@@ -105,16 +111,16 @@ public class ExceptionInjection {
throw e;
}
- throw new IllegalStateException("throwable was not an unchecked exception");
+ throw new IllegalStateException("Throwable was not an unchecked exception.");
}
/**
* Throw the checked exception specified by this injection.
*
* @param exceptionClass the class of the exception to throw
- * @throws T if it is time to throw the exception
+ * @throws T if it is time to throw the exception
* @throws IllegalStateException if it is time to throw the exception, and the exception's class
- * is incompatible with the class specified by the injection
+ * is incompatible with the class specified by the injection
*/
public <T extends Throwable> void throwChecked(final Class<T> exceptionClass) throws T {
final Throwable throwable = constructException();
@@ -128,6 +134,6 @@ public class ExceptionInjection {
}
throw new IllegalStateException("Constructed Throwable(" + throwable.getClass().getName()
- + ") is incompatible with exceptionClass("+ exceptionClass.getName() + ")");
+ + ") is incompatible with exceptionClass(" + exceptionClass.getName() + ")");
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
deleted file mode 100644
index 54bc351..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import org.apache.drill.exec.server.DrillbitContext;
-
-/**
- * Injects exceptions at execution time for testing. Any class that wants to simulate exceptions
- * for testing should have it's own private static instance of an injector (similar to the use
- * of loggers).
- *
- * <p>See {@link org.apache.drill.exec.testing.TestExceptionInjection} for examples of
- * use.
- */
-public class ExceptionInjector {
- private final Class<?> clazz; // the class that owns this injector
-
- /**
- * Constructor. Classes should use the static {@link #getInjector()} method to obtain
- * their injector.
- *
- * @param clazz the owning class
- */
- private ExceptionInjector(final Class<?> clazz) {
- this.clazz = clazz;
- }
-
- /**
- * Create an injector.
- *
- * @param clazz the owning class
- * @return the newly created injector
- */
- public static ExceptionInjector getInjector(final Class<?> clazz) {
- return new ExceptionInjector(clazz);
- }
-
- /**
- * Get the injector's owning class.
- *
- * @return the injector's owning class
- */
- public Class<?> getSiteClass() {
- return clazz;
- }
-
- /**
- * Lookup an injection within this class that matches the site description.
- *
- * @param drillbitContext
- * @param desc the site description
- * @return the injection, if there is one; null otherwise
- */
- private ExceptionInjection getInjection(final DrillbitContext drillbitContext, final String desc) {
- final SimulatedExceptions simulatedExceptions = drillbitContext.getSimulatedExceptions();
- final ExceptionInjection exceptionInjection = simulatedExceptions.lookupInjection(drillbitContext, this, desc);
- return exceptionInjection;
- }
-
- /**
- * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
- * for it to be thrown.
- *
- * <p>Implementors use this in their code at a site where they want to simulate an exception
- * during testing.
- *
- * @param drillbitContext
- * @param desc the site description
- * throws the exception specified by the injection, if it is time
- */
- public void injectUnchecked(final DrillbitContext drillbitContext, final String desc) {
- final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
- if (exceptionInjection != null) {
- exceptionInjection.throwUnchecked();
- }
- }
-
- /**
- * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
- * for it to be thrown.
- *
- * <p>Implementors use this in their code at a site where they want to simulate an exception
- * during testing.
- *
- * @param drillbitContext
- * @param desc the site description
- * @param exceptionClass the expected class of the exception (or a super class of it)
- * @throws T the exception specified by the injection, if it is time
- */
- public <T extends Throwable> void injectChecked(
- final DrillbitContext drillbitContext, final String desc, final Class<T> exceptionClass) throws T {
- final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
- if (exceptionInjection != null) {
- exceptionInjection.throwChecked(exceptionClass);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
new file mode 100644
index 0000000..1171bf8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.server.options.TypeValidators.TypeValidator;
+import org.apache.drill.exec.testing.InjectionSite.InjectionSiteKeyDeserializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tracks the simulated controls that will be injected for testing purposes.
+ */
+public final class ExecutionControls {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControls.class);
+
+ // used to map JSON specified injections to POJOs
+ public static final ObjectMapper controlsOptionMapper = new ObjectMapper();
+
+ static {
+ controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
+ }
+
+ // Jackson MixIn for all types of injections
+ @JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+ @JsonSubTypes({
+ @Type(value = ExceptionInjection.class, name = "exception"),
+ @Type(value = PauseInjection.class, name = "pause")})
+ public static abstract class InjectionMixIn {
+ }
+
+ /**
+ * The JSON specified for the {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS}
+ * option is validated using this class. Controls are short-lived options.
+ */
+ public static class ControlsOptionValidator extends TypeValidator {
+
+ private final int ttl; // the number of queries for which this option is valid
+
+ /**
+ * Constructor for controls option validator.
+ * @param name the name of the validator
+ * @param def the default JSON, specified as string
+ * @param ttl the number of queries for which this option should be valid
+ */
+ public ControlsOptionValidator(final String name, final String def, final int ttl) {
+ super(name, OptionValue.Kind.DOUBLE, OptionValue.createString(OptionType.SESSION, name, def));
+ assert ttl > 0;
+ this.ttl = ttl;
+ }
+
+ @Override
+ public int getTtl() {
+ return ttl;
+ }
+
+ @Override
+ public boolean isShortLived() {
+ return true;
+ }
+
+ @Override
+ public void validate(final OptionValue v) throws ExpressionParsingException {
+ if (v.type != OptionType.SESSION) {
+ throw new ExpressionParsingException("Controls can be set only at SESSION level.");
+ }
+ final String jsonString = v.string_val;
+ try {
+ controlsOptionMapper.readValue(jsonString, Controls.class);
+ } catch (IOException e) {
+ throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
+ }
+ }
+ }
+
+ /**
+ * POJO used to parse JSON-specified controls.
+ */
+ public static class Controls {
+ public Collection<? extends Injection> injections;
+ }
+
+ /**
+ * The default value for controls.
+ */
+ public static final String DEFAULT_CONTROLS = "{}";
+
+ /**
+ * Caches the currently specified controls.
+ */
+ @JsonDeserialize(keyUsing = InjectionSiteKeyDeserializer.class)
+ private final Map<InjectionSite, Injection> controls = new HashMap<>();
+
+ private final DrillbitEndpoint endpoint; // the current endpoint
+
+ public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) {
+ this.endpoint = endpoint;
+
+ final OptionValue optionValue = options.getOption(ExecConstants.DRILLBIT_CONTROL_INJECTIONS);
+ if (optionValue == null) {
+ return;
+ }
+
+ final String opString = optionValue.string_val;
+ final Controls controls;
+ try {
+ controls = controlsOptionMapper.readValue(opString, Controls.class);
+ } catch (IOException e) {
+ // This never happens. opString must have been validated.
+ logger.warn("Could not parse injections. Injections must have been validated before this point.");
+ throw new DrillRuntimeException("Could not parse injections.", e);
+ }
+ if (controls.injections == null) {
+ return;
+ }
+
+ logger.debug("Adding control injections: \n{}", opString);
+ for (final Injection injection : controls.injections) {
+ this.controls.put(new InjectionSite(injection.getSiteClass(), injection.getDesc()), injection);
+ }
+ }
+
+ /**
+ * Look for an exception injection matching the given injector, site description and endpoint.
+ *
+ * @param injector the injector, which indicates a class
+ * @param desc the injection site description
+ * @return the exception injection, if there is one for the injector, site and endpoint; null otherwise
+ */
+ public ExceptionInjection lookupExceptionInjection(final ExecutionControlsInjector injector, final String desc) {
+ final Injection injection = lookupInjection(injector, desc);
+ return injection != null ? (ExceptionInjection) injection : null;
+ }
+
+ /**
+ * Look for an pause injection matching the given injector, site description and endpoint.
+ *
+ * @param injector the injector, which indicates a class
+ * @param desc the injection site description
+ * @return the pause injection, if there is one for the injector, site and endpoint; null otherwise
+ */
+ public PauseInjection lookupPauseInjection(final ExecutionControlsInjector injector, final String desc) {
+ final Injection injection = lookupInjection(injector, desc);
+ return injection != null ? (PauseInjection) injection : null;
+ }
+
+ private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
+ if (controls.isEmpty()) {
+ return null;
+ }
+
+ // lookup the request
+ final InjectionSite site = new InjectionSite(injector.getSiteClass(), desc);
+ final Injection injection = controls.get(site);
+ if (injection == null) {
+ return null;
+ }
+ // return only if injection was meant for this drillbit
+ return injection.isValidForBit(endpoint) ? injection : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
new file mode 100644
index 0000000..4b1cd0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.util.AssertionUtil;
+import org.slf4j.Logger;
+
+/**
+ * Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions
+ * or inject pauses for testing should have it's own private static instance of an injector (similar to the use
+ * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or
+ * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and
+ * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use.
+ */
+public class ExecutionControlsInjector {
+// private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControlsInjector.class);
+
+ private final Class<?> clazz; // the class that owns this injector
+
+ /**
+ * Constructor. Classes should use the static {@link #getInjector} method to obtain their injector.
+ *
+ * @param clazz the owning class
+ */
+ protected ExecutionControlsInjector(final Class<?> clazz) {
+ this.clazz = clazz;
+ }
+
+ /**
+ * Create an injector if assertions are enabled
+ *
+ * @param clazz the owning class
+ * @return the newly created injector
+ */
+ public static ExecutionControlsInjector getInjector(final Class<?> clazz) {
+ if (AssertionUtil.isAssertionsEnabled()) {
+ return new ExecutionControlsInjector(clazz);
+ } else {
+ return new NoOpControlsInjector(clazz);
+ }
+ }
+
+ /**
+ * Get the injector's owning class.
+ *
+ * @return the injector's owning class
+ */
+ public Class<?> getSiteClass() {
+ return clazz;
+ }
+
+ /**
+ * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
+ * for it to be thrown.
+ * <p/>
+ * <p>Implementors use this in their code at a site where they want to simulate an exception
+ * during testing.
+ *
+ * @param executionControls the controls in the current context
+ * @param desc the site description
+ * throws the exception specified by the injection, if it is time
+ */
+ public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+ final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+ if (exceptionInjection != null) {
+ exceptionInjection.throwUnchecked();
+ }
+ return this;
+ }
+
+ /**
+ * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
+ * for it to be thrown.
+ * <p/>
+ * <p>Implementors use this in their code at a site where they want to simulate an exception
+ * during testing.
+ *
+ * @param executionControls the controls in the current context
+ * @param desc the site description
+ * @param exceptionClass the expected class of the exception (or a super class of it)
+ * @throws T the exception specified by the injection, if it is time
+ */
+ public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+ final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+ if (exceptionInjection != null) {
+ exceptionInjection.throwChecked(exceptionClass);
+ }
+ return this;
+ }
+
+ /**
+ * Pauses at this point, if such an injection is specified (i.e. matches the site description).
+ * <p/>
+ * <p>Implementors use this in their code at a site where they want to simulate a pause
+ * during testing.
+ *
+ * @param executionControls the controls in the current context
+ * @param desc the site description
+ * @param logger logger of the class containing the injection site
+ */
+ public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ final Logger logger) {
+ final PauseInjection pauseInjection =
+ executionControls.lookupPauseInjection(this, desc);
+
+ if (pauseInjection != null) {
+ logger.debug("Pausing at {}", desc);
+ pauseInjection.pause();
+ logger.debug("Resuming at {}", desc);
+ }
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
new file mode 100644
index 0000000..96fed3a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The base class for all types of injections (currently, pause and exception).
+ */
+public abstract class Injection {
+
+ protected final String address; // the address of the drillbit on which to inject
+ protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
+ private final Class<?> siteClass; // the class where the injection should happen
+ private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+ private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
+ private final AtomicInteger nFire; // the number of times to do the injection, after any skips; starts > 0
+
+ protected Injection(final String address, final int port, final String siteClass, final String desc,
+ final int nSkip, final int nFire) throws InjectionConfigurationException {
+ if (desc == null || desc.isEmpty()) {
+ throw new InjectionConfigurationException("Injection desc is null or empty.");
+ }
+
+ if (nSkip < 0) {
+ throw new InjectionConfigurationException("Injection nSkip is not non-negative.");
+ }
+
+ if (nFire <= 0) {
+ throw new InjectionConfigurationException("Injection nFire is non-positive.");
+ }
+ try {
+ this.siteClass = Class.forName(siteClass);
+ } catch (ClassNotFoundException e) {
+ throw new InjectionConfigurationException("Injection siteClass not found.", e);
+ }
+
+ this.address = address;
+ this.port = port;
+ this.desc = desc;
+ this.nSkip = new AtomicInteger(nSkip);
+ this.nFire = new AtomicInteger(nFire);
+ }
+
+ /**
+ * This function checks if it is the right time for the injection to happen.
+ *
+ * @return if the injection should be injected now
+ */
+ protected final boolean injectNow() {
+ return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public Class<?> getSiteClass() {
+ return siteClass;
+ }
+
+ // If the address is null, the injection must happen on every drillbit that reaches the specified site.
+ public final boolean isValidForBit(final DrillbitEndpoint endpoint) {
+ return address == null ||
+ (address.equals(endpoint.getAddress()) && port == endpoint.getUserPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
new file mode 100644
index 0000000..4fcb33a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * An Exception thrown when injection configuration is incorrect.
+ */
+public class InjectionConfigurationException extends Exception {
+ public InjectionConfigurationException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+ public InjectionConfigurationException(String message) {
+ super(message);
+ }
+
+ public InjectionConfigurationException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
index 9e19fdd..2bb9acc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
@@ -17,8 +17,13 @@
*/
package org.apache.drill.exec.testing;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
import com.google.common.base.Preconditions;
+import java.io.IOException;
+
public class InjectionSite {
private final Class<?> clazz;
private final String desc;
@@ -31,14 +36,6 @@ public class InjectionSite {
this.desc = desc;
}
- public Class<?> getSiteClass() {
- return clazz;
- }
-
- public String getDesc() {
- return desc;
- }
-
@Override
public boolean equals(Object o) {
if (o == null) {
@@ -65,8 +62,35 @@ public class InjectionSite {
return true;
}
+ private static final String SEPARATOR = ",";
+
+ @Override
+ public String toString() {
+ return clazz.getName() + SEPARATOR + desc;
+ }
+
@Override
public int hashCode() {
return (clazz.hashCode() + 13) ^ (1 - desc.hashCode());
}
+
+ /**
+ * Key Deserializer for InjectionSite.
+ * Since JSON object keys must be strings, deserialize from a string.
+ */
+ public static class InjectionSiteKeyDeserializer extends KeyDeserializer {
+
+ @Override
+ public Object deserializeKey(final String key, final DeserializationContext context)
+ throws IOException, JsonProcessingException {
+ final String[] fields = key.split(SEPARATOR);
+ final Class<?> siteClass;
+ try {
+ siteClass = Class.forName(fields[0]);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Class " + fields[0] + " not found.", e);
+ }
+ return new InjectionSite(siteClass, fields[1]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
new file mode 100644
index 0000000..80d9790
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.slf4j.Logger;
+
+/**
+ * An injector that does not inject any controls.
+ */
+public final class NoOpControlsInjector extends ExecutionControlsInjector {
+
+ protected NoOpControlsInjector(final Class<?> clazz) {
+ super(clazz);
+ }
+
+ @Override
+ public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+ return this;
+ }
+
+ @Override
+ public <T extends Throwable> ExecutionControlsInjector injectChecked(
+ final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+ return this;
+ }
+
+ @Override
+ public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+ final Logger logger) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
new file mode 100644
index 0000000..e5f9c9c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+/**
+ * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
+ * injected pauses; these pauses are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class PauseInjection extends Injection {
+
+ private final long millis;
+
+ @JsonCreator // ensures instances are created only through JSON
+ private PauseInjection(@JsonProperty("address") final String address,
+ @JsonProperty("port") final int port,
+ @JsonProperty("siteClass") final String siteClass,
+ @JsonProperty("desc") final String desc,
+ @JsonProperty("nSkip") final int nSkip,
+ @JsonProperty("nFire") final int nFire,
+ @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
+ super(address, port, siteClass, desc, nSkip, nFire);
+ if (millis <= 0) {
+ throw new InjectionConfigurationException("Pause millis is non-positive.");
+ }
+ this.millis = millis;
+ }
+
+ public void pause() {
+ if (! injectNow()) {
+ return;
+ }
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ throw new DrillRuntimeException("Well, I should be sleeping.");
+ }
+ }
+}