You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/21 05:00:21 UTC

[1/2] drill git commit: DRILL-2383: Support to inject exceptions and pauses in various components of Drill + Controls are fired only if assertions are enabled + Controls can be introduced in any class that has access to FragmentContext/QueryContext + Con

Repository: drill
Updated Branches:
  refs/heads/master 21dfe7ac8 -> be8d95393


http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
deleted file mode 100644
index 0292c08..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Tracks the simulated exceptions that will be injected for testing purposes.
- */
-public class SimulatedExceptions {
-  /**
-   * Caches the currently specified ExceptionInjections. Updated when
-   * {@link org.apache.drill.exec.ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS} is noticed
-   * to have changed.
-   */
-  private HashMap<InjectionSite, ExceptionInjection> exMap = null;
-
-  /**
-   * The string that was parsed to produce exMap; we keep it as a means to quickly detect whether
-   * the option string has changed or not between calls to getOption().
-   */
-  private String exString = null;
-
-  /**
-   * POJO used to parse JSON-specified exception injection.
-   */
-  public static class InjectionOption {
-    public String siteClass;
-    public String desc;
-    public int nSkip;
-    public int nFire;
-    public String exceptionClass;
-  }
-
-  /**
-   * POJO used to parse JSON-specified set of exception injections.
-   */
-  public static class InjectionOptions {
-    public InjectionOption injections[];
-  }
-
-  /**
-   * Look for an exception injection matching the given injector and site description.
-   *
-   * @param drillbitContext
-   * @param injector the injector, which indicates a class
-   * @param desc the injection site description
-   * @return the exception injection, if there is one for the injector and site; null otherwise
-   */
-  public synchronized ExceptionInjection lookupInjection(
-      final DrillbitContext drillbitContext, final ExceptionInjector injector, final String desc) {
-    // get the option string
-    final OptionManager optionManager = drillbitContext.getOptionManager();
-    final OptionValue optionValue = optionManager.getOption(ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS);
-    final String opString = optionValue.string_val;
-
-    // if the option string is empty, there's nothing to inject
-    if ((opString == null) || opString.isEmpty()) {
-      // clear these in case there used to be something to inject
-      exMap = null;
-      exString = null;
-      return null;
-    }
-
-    // if the option string is different from before, recreate the injection map
-    if ((exString == null) || (exString != opString) && !exString.equals(opString)) {
-      // parse the option string into JSON
-      final ObjectMapper objectMapper = new ObjectMapper();
-      InjectionOptions injectionOptions;
-      try {
-        injectionOptions = objectMapper.readValue(opString, InjectionOptions.class);
-      } catch(IOException e) {
-        throw new RuntimeException("Couldn't parse exception injections", e);
-      }
-
-      // create a new map from the option JSON
-      exMap = new HashMap<>();
-      for(InjectionOption injectionOption : injectionOptions.injections) {
-        addToMap(exMap, injectionOption);
-      }
-
-      // this is the current set of options in effect
-      exString = opString;
-    }
-
-    // lookup the request
-    final InjectionSite injectionSite = new InjectionSite(injector.getSiteClass(), desc);
-    final ExceptionInjection injection = exMap.get(injectionSite);
-    return injection;
-  }
-
-  /**
-   * Adds a single exception injection to the injection map
-   *
-   * <p>Validates injection options before adding to the map, and throws various exceptions for
-   * validation failures.
-   *
-   * @param exMap the injection map
-   * @param injectionOption the option to add
-   */
-  private static void addToMap(
-      final HashMap<InjectionSite, ExceptionInjection> exMap, final InjectionOption injectionOption) {
-    Class<?> siteClass;
-    try {
-      siteClass = Class.forName(injectionOption.siteClass);
-    } catch(ClassNotFoundException e) {
-      throw new RuntimeException("Injection siteClass not found", e);
-    }
-
-    if ((injectionOption.desc == null) || injectionOption.desc.isEmpty()) {
-      throw new RuntimeException("Injection desc is null or empty");
-    }
-
-    if (injectionOption.nSkip < 0) {
-      throw new RuntimeException("Injection nSkip is not non-negative");
-    }
-
-    if (injectionOption.nFire <= 0) {
-      throw new RuntimeException("Injection nFire is non-positive");
-    }
-
-    Class<?> clazz;
-    try {
-      clazz = Class.forName(injectionOption.exceptionClass);
-    } catch(ClassNotFoundException e) {
-      throw new RuntimeException("Injected exceptionClass not found", e);
-    }
-
-    if (!Throwable.class.isAssignableFrom(clazz)) {
-      throw new RuntimeException("Injected exceptionClass is not a Throwable");
-    }
-
-    @SuppressWarnings("unchecked")
-    final Class<? extends Throwable> exceptionClass = (Class<? extends Throwable>) clazz;
-
-    final InjectionSite injectionSite = new InjectionSite(siteClass, injectionOption.desc);
-    final ExceptionInjection exceptionInjection = new ExceptionInjection(
-        injectionOption.desc, injectionOption.nSkip, injectionOption.nFire, exceptionClass);
-    exMap.put(injectionSite, exceptionInjection);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d94b9f0..d94ffba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -67,7 +67,7 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.testing.ExceptionInjector;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
@@ -100,7 +100,7 @@ import com.google.common.collect.Sets;
  */
 public class Foreman implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-  private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(Foreman.class);
   private static final int RPC_WAIT_IN_SECONDS = 90;
 
   private final QueryId queryId;
@@ -190,8 +190,7 @@ public class Foreman implements Runnable {
     queryManager.markStartTime();
 
     try {
-      injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class);
-
+      injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
       // convert a run query request into action
       switch (queryRequest.getType()) {
       case LOGICAL:
@@ -206,7 +205,7 @@ public class Foreman implements Runnable {
       default:
         throw new IllegalStateException();
       }
-      injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
+      injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
     } catch (final ForemanException e) {
       moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {
@@ -346,6 +345,7 @@ public class Foreman implements Runnable {
     drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
     logger.debug("Submitting fragments to run.");
+    injector.injectPause(queryContext.getExecutionControls(), "pause-run-plan", logger);
 
     // set up the root fragment first so we'll have incoming buffers available.
     setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
@@ -586,6 +586,7 @@ public class Foreman implements Runnable {
       Preconditions.checkState(resultState != null);
 
       logger.info("foreman cleaning up.");
+      injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
 
       // These are straight forward removals from maps, so they won't throw.
       drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
@@ -900,6 +901,7 @@ public class Foreman implements Runnable {
           .build();
     }
 
+    injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class);
     /*
      * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
      * the regular sendListener event delivery.
@@ -927,7 +929,7 @@ public class Foreman implements Runnable {
     }
     final InitializeFragments initFrags = fb.build();
 
-    logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
+    logger.debug("Sending remote fragments to \nNode:\n{} \n\nData:\n{}", assignment, initFrags);
     final FragmentSubmitListener listener =
         new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
     controller.getTunnel(assignment).sendFragments(listener, initFrags);

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 3570ba5..be798ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -35,6 +36,8 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
@@ -43,6 +46,7 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
  */
 public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
 
   private final String fragmentName;
   private final FragmentRoot rootOperator;
@@ -101,16 +105,29 @@ public class FragmentExecutor implements Runnable {
 
   /**
    * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
+   * NOTE that this can be called from threads *other* than the one running this runnable(),
+   * so we need to be careful about the state transitions that can result.
    */
   public void cancel() {
-    acceptExternalEvents.awaitUninterruptibly();
-
     /*
-     * Note that this can be called from threads *other* than the one running this runnable(), so we need to be careful
-     * about the state transitions that can result. We set the cancel requested flag but the actual cancellation is
-     * managed by the run() loop.
+     * When cancel() is called before run(), root is not initialized and the executor is not
+     * ready to accept external events. So do not wait to change the state.
+     *
+     * For example, consider the case when the Foreman sets up the root fragment executor which is
+     * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
+     * run() method on the root executor will never be called, and the executor will never be ready
+     * to accept external events. This will make the cancelling thread wait forever.
      */
-    updateState(FragmentState.CANCELLATION_REQUESTED);
+    synchronized (this) {
+      if (root != null) {
+        acceptExternalEvents.awaitUninterruptibly();
+      }
+
+      /*
+       * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
+       */
+      updateState(FragmentState.CANCELLATION_REQUESTED);
+    }
   }
 
   /**
@@ -137,7 +154,8 @@ public class FragmentExecutor implements Runnable {
     final Thread myThread = Thread.currentThread();
     final String originalThreadName = myThread.getName();
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
-    final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
+    final DrillbitContext drillbitContext = fragmentContext.getDrillbitContext();
+    final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
     final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
     final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
 
@@ -145,16 +163,26 @@ public class FragmentExecutor implements Runnable {
 
       myThread.setName(newThreadName);
 
-      root = ImplCreator.getExec(fragmentContext, rootOperator);
+      synchronized (this) {
+        /*
+         * fragmentState might have changed even before this method is called e.g. cancel()
+         */
+        if (shouldContinue()) {
+          root = ImplCreator.getExec(fragmentContext, rootOperator);
 
-      clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
-      updateState(FragmentState.RUNNING);
+          clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+          updateState(FragmentState.RUNNING);
 
-      acceptExternalEvents.countDown();
+          acceptExternalEvents.countDown();
 
-      logger.debug("Starting fragment runner. {}:{}",
-          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+          final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+          logger.debug("Starting fragment {}:{} on {}:{}",
+            fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+            endpoint.getAddress(), endpoint.getUserPort());
+        }
+      }
 
+      injector.injectChecked(fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
       /*
        * Run the query until root.next returns false OR we no longer need to continue.
        */
@@ -163,7 +191,6 @@ public class FragmentExecutor implements Runnable {
       }
 
       updateState(FragmentState.FINISHED);
-
     } catch (AssertionError | Exception e) {
       fail(e);
     } finally {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 854f474..8854ef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -24,6 +24,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -32,6 +34,12 @@ public class UserWorker{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
 
   private final WorkerBee bee;
+  private final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
+    @Override
+    public void increment(final UserSession session) {
+      session.incrementQueryCount(this);
+    }
+  };
 
   public UserWorker(WorkerBee bee) {
     super();
@@ -46,6 +54,7 @@ public class UserWorker{
     long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
     long p2 = r.nextLong();
     QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
+    incrementer.increment(connection.getSession());
     Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
     bee.addNewForeman(foreman);
     return id;

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index d8ed0b3..2ff4de7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -370,7 +371,7 @@ public class BaseTestQuery extends ExecTest {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       System.out.println("Query completed successfully with row count: " + count.get());
       latch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 264123f..c3223b8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.QueryOptionManager;
@@ -44,6 +43,7 @@ import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.testing.ExecutionControls;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
 
@@ -77,8 +77,10 @@ public class PlanningBase extends ExecTest{
 
     final SystemOptionManager systemOptions = new SystemOptionManager(config, provider);
     systemOptions.init();
-    final SessionOptionManager sessionOptions = new SessionOptionManager(systemOptions);
+    final UserSession userSession = UserSession.Builder.newBuilder().withOptionManager(systemOptions).build();
+    final SessionOptionManager sessionOptions = (SessionOptionManager) userSession.getOptions();
     final QueryOptionManager queryOptions = new QueryOptionManager(sessionOptions);
+    final ExecutionControls executionControls = new ExecutionControls(queryOptions, DrillbitEndpoint.getDefaultInstance());
 
     new NonStrictExpectations() {
       {
@@ -126,6 +128,8 @@ public class PlanningBase extends ExecTest{
         result = table;
         context.getAllocator();
         result = allocator;
+        context.getExecutionControls();
+        result = executionControls;
       }
     };
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 99aa9fc..2cd5c95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -58,7 +58,8 @@ public abstract class SingleRowListener implements UserResultsListener {
   }
 
   @Override
-  public void queryCompleted() {
+  public void queryCompleted(QueryState state) {
+    queryState = state;
     try {
       cleanup();
     } finally {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index f5f5b8d..f2240cc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
 import org.apache.drill.exec.server.options.SessionOptionManager;
@@ -43,7 +44,10 @@ public class TestClassTransformation extends BaseTestQuery {
 
   @BeforeClass
   public static void beforeTestClassTransformation() throws Exception {
-    sessionOptions = new SessionOptionManager(getDrillbitContext().getOptionManager());
+    final UserSession userSession = UserSession.Builder.newBuilder()
+      .withOptionManager(getDrillbitContext().getOptionManager())
+      .build();
+    sessionOptions = (SessionOptionManager) userSession.getOptions();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 7aee6d3..ee0e841 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,14 +17,7 @@
  */
 package org.apache.drill.exec.server;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.commons.math3.util.Pair;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
 import org.apache.drill.common.AutoCloseables;
@@ -32,41 +25,57 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.ExceptionWrapper;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.testing.ExceptionInjectionUtil;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.ForemanException;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points.
+ * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
  */
-public class TestDrillbitResilience extends ExecTest {
+public class TestDrillbitResilience {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
 
   private static ZookeeperHelper zkHelper;
@@ -74,6 +83,13 @@ public class TestDrillbitResilience extends ExecTest {
   private static final Map<String, Drillbit> drillbits = new HashMap<>();
   private static DrillClient drillClient;
 
+  /**
+   * Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to
+   * counting sys.drillbits.
+   */
+  private static final String TEST_QUERY = "select * from sys.memory";
+  private static final long PAUSE_TIME_MILLIS = 1000L;
+
   private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
     if (drillbits.containsKey(name)) {
       throw new IllegalStateException("Drillbit named \"" + name + "\" already exists");
@@ -83,7 +99,7 @@ public class TestDrillbitResilience extends ExecTest {
       @SuppressWarnings("resource")
       final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet);
       drillbits.put(name, drillbit);
-    } catch(DrillbitStartupException e) {
+    } catch (final DrillbitStartupException e) {
       throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e);
     }
   }
@@ -91,7 +107,7 @@ public class TestDrillbitResilience extends ExecTest {
   /**
    * Shutdown the specified drillbit.
    *
-   * @param name
+   * @param name name of the drillbit
    */
   private static void stopDrillbit(final String name) {
     @SuppressWarnings("resource")
@@ -102,7 +118,7 @@ public class TestDrillbitResilience extends ExecTest {
 
     try {
       drillbit.close();
-    } catch(Exception e) {
+    } catch (final Exception e) {
       final String message = "Error shutting down Drillbit \"" + name + "\"";
       System.err.println(message + '.');
       logger.warn(message, e);
@@ -113,9 +129,10 @@ public class TestDrillbitResilience extends ExecTest {
    * Shutdown all the drillbits.
    */
   private static void stopAllDrillbits() {
-    for(String name : drillbits.keySet()) {
+    for (String name : drillbits.keySet()) {
       stopDrillbit(name);
     }
+    drillbits.clear();
   }
 
   /*
@@ -140,11 +157,11 @@ public class TestDrillbitResilience extends ExecTest {
     startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
     startDrillbit(DRILLBIT_BETA, remoteServiceSet);
     startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
-    clearAllInjections();
 
     // create a client
     final DrillConfig drillConfig = zkHelper.getConfig();
     drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null);
+    clearAllInjections();
   }
 
   @AfterClass
@@ -165,17 +182,16 @@ public class TestDrillbitResilience extends ExecTest {
   }
 
   /**
-   * Clear all injections from all drillbits.
+   * Clear all exceptions.
    */
   private static void clearAllInjections() {
-    for(Drillbit drillbit : drillbits.values()) {
-      ExceptionInjectionUtil.clearInjections(drillbit);
-    }
+    assertTrue(drillClient != null);
+    ControlsInjectionUtil.clearControls(drillClient);
   }
 
   /**
    * Check that all the drillbits are ok.
-   *
+   * <p/>
    * <p>The current implementation does this by counting the number of drillbits using a
    * query.
    */
@@ -190,7 +206,7 @@ public class TestDrillbitResilience extends ExecTest {
             final QueryData queryData = queryResultBatch.getHeader();
             try {
               loader.load(queryData.getDef(), queryResultBatch.getData());
-            } catch(SchemaChangeException e) {
+            } catch (final SchemaChangeException e) {
               fail(e.toString());
             }
             assertEquals(1, loader.getRecordCount());
@@ -223,10 +239,10 @@ public class TestDrillbitResilience extends ExecTest {
         };
 
     try {
-      QueryTestUtil.testWithListener(
-          drillClient, QueryType.SQL, "select count(*) from sys.drillbits", listener);
+      QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener);
       listener.waitForCompletion();
-    } catch(Exception e) {
+      assertTrue(listener.getQueryState() == QueryState.COMPLETED);
+    } catch (final Exception e) {
       throw new RuntimeException("Couldn't query active drillbits", e);
     }
 
@@ -242,88 +258,91 @@ public class TestDrillbitResilience extends ExecTest {
   }
 
   /**
-   * Set the given injections on a single named drillbit.
-   *
-   * @param bitName
-   * @param injectionOptions the injections
-   */
-  private static void setInjections(final String bitName, final InjectionOptions injectionOptions) {
-    @SuppressWarnings("resource")
-    final Drillbit drillbit = drillbits.get(bitName);
-    if (drillbit == null) {
-      throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found");
-    }
-
-    ExceptionInjectionUtil.setInjections(drillbit, injectionOptions);
-  }
-
-  /**
-   * Set the given injections on all drillbits.
-   *
-   * @param injectionOptions the injections
+   * Set the given exceptions.
    */
-  private static void setInjectionsAll(final InjectionOptions injectionOptions) {
-    for(Drillbit drillbit : drillbits.values()) {
-      ExceptionInjectionUtil.setInjections(drillbit, injectionOptions);
-    }
+  private static void setExceptions(final String controlsString) {
+    ControlsInjectionUtil.setControls(drillClient, controlsString);
   }
 
   /**
    * Create a single exception injection.
    *
-   * @param siteClassName the name of the injection site class
-   * @param desc the injection site description
-   * @param exceptionClassName the name of the exception to throw
-   * @return the created injection options POJO
+   * @param siteClass      the injection site class
+   * @param desc           the injection site description
+   * @param exceptionClass the class of the exception to throw
+   * @return the created controls JSON as string
    */
-  private static InjectionOptions createSingleInjection(
-      final String siteClassName, final String desc, final String exceptionClassName) {
-    final InjectionOption injectionOption = new InjectionOption();
-    injectionOption.nFire = 1;
-    injectionOption.siteClass = siteClassName;
-    injectionOption.desc = desc;
-    injectionOption.exceptionClass = exceptionClassName;
-
-    final InjectionOptions injectionOptions = new InjectionOptions();
-    injectionOptions.injections = new InjectionOption[1];
-    injectionOptions.injections[0] = injectionOption;
-
-    return injectionOptions;
+  private static String createSingleException(final Class<?> siteClass, final String desc,
+                                              final Class<? extends Throwable> exceptionClass) {
+    final String siteClassName = siteClass.getName();
+    final String exceptionClassName = exceptionClass.getName();
+    return "{\"injections\":[{"
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"" + siteClassName + "\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1,"
+      + "\"exceptionClass\":\"" + exceptionClassName + "\""
+      + "}]}";
   }
 
   /**
    * Create a single exception injection.
    *
-   * @param siteClass the injection site class
-   * @param desc the injection site description
+   * @param siteClass      the injection site class
+   * @param desc           the injection site description
    * @param exceptionClass the class of the exception to throw
-   * @return the created injection options POJO
+   * @param bitName        the drillbit name which should be injected into
+   * @return the created controls JSON as string
    */
-  private static InjectionOptions createSingleInjection(
-      final Class<?> siteClass, final String desc, final Class<? extends Throwable> exceptionClass) {
-    return createSingleInjection(siteClass.getName(), desc, exceptionClass.getName());
+  private static String createSingleExceptionOnBit(final Class<?> siteClass, final String desc,
+                                                   final Class<? extends Throwable> exceptionClass,
+                                                   final String bitName) {
+    final String siteClassName = siteClass.getName();
+    final String exceptionClassName = exceptionClass.getName();
+    @SuppressWarnings("resource")
+    final Drillbit drillbit = drillbits.get(bitName);
+    if (drillbit == null) {
+      throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found");
+    }
+
+    final DrillbitEndpoint endpoint = drillbit.getContext().getEndpoint();
+    return "{\"injections\":[{"
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\","
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"" + siteClassName + "\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1,"
+      + "\"exceptionClass\":\"" + exceptionClassName + "\""
+      + "}]}";
   }
 
   /**
    * Check that the injected exception is what we were expecting.
    *
-   * @param caught the exception that was caught (by the test)
+   * @param throwable      the throwable that was caught (by the test)
    * @param exceptionClass the expected exception class
-   * @param desc the expected exception site description
+   * @param desc           the expected exception site description
    */
-  private static void assertInjected(
-      final UserException caught, final Class<? extends Throwable> exceptionClass, final String desc) {
-    ExceptionWrapper cause = caught.getOrCreatePBError(false).getException();
+  private static void assertExceptionInjected(final Throwable throwable,
+                                              final Class<? extends Throwable> exceptionClass, final String desc) {
+    assertTrue(throwable instanceof UserException);
+    final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
     assertEquals(exceptionClass.getName(), cause.getExceptionClass());
     assertEquals(desc, cause.getMessage());
   }
 
   @Test
-  public void testSettingNoopInjectionsAndQuery() throws Exception {
-    final InjectionOptions injectionOptions =
-        createSingleInjection(getClass(), "noop", RuntimeException.class);
-    setInjections(DRILLBIT_BETA, injectionOptions);
-    QueryTestUtil.test(drillClient, "select * from sys.drillbits");
+  public void settingNoopInjectionsAndQuery() {
+    final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
+    setExceptions(controls);
+    try {
+      QueryTestUtil.test(drillClient, TEST_QUERY);
+    } catch (final Exception e) {
+      fail(e.getMessage());
+    }
   }
 
   /**
@@ -331,51 +350,25 @@ public class TestDrillbitResilience extends ExecTest {
    * description
    *
    * @param desc site description
-   * @throws Exception
    */
-  private static void testForeman(final String desc) throws Exception {
-    final InjectionOptions injectionOptions = createSingleInjection(Foreman.class, desc, ForemanException.class);
-    setInjectionsAll(injectionOptions);
+  private static void testForeman(final String desc) {
+    final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
+    setExceptions(controls);
     try {
-      QueryTestUtil.test(drillClient, "select * from sys.drillbits");
+      QueryTestUtil.test(drillClient, TEST_QUERY);
       fail();
-    } catch(UserException dre) {
-      assertInjected(dre, ForemanException.class, desc);
+    } catch (final Exception e) {
+      assertExceptionInjected(e, ForemanException.class, desc);
     }
   }
 
   @SuppressWarnings("static-method")
   @Test
-  public void testForeman_runTryBeginning() throws Exception {
+  public void foreman_runTryBeginning() {
     testForeman("run-try-beginning");
   }
 
-  @SuppressWarnings("static-method")
-  @Test
-  public void testForeman_setInjectionViaAlterSystem() throws Exception {
-    final String exceptionDesc = "run-try-beginning";
-    final InjectionOptions injectionOptions =
-        createSingleInjection(Foreman.class, exceptionDesc, ForemanException.class);
-    final ObjectMapper objectMapper = new ObjectMapper();
-    final String jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(injectionOptions);
-    final String alterSession = String.format(
-        "alter system set `%s`='%s'",
-        ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, jsonString);
-    QueryTestUtil.test(drillClient, alterSession);
-    try {
-      QueryTestUtil.test(drillClient, "select * from sys.drillbits");
-      fail();
-    } catch(UserException dre) {
-      assertInjected(dre, ForemanException.class, exceptionDesc);
-    }
-  }
-
   /*
-   * This test doesn't work because worker threads have returned the result to the client before
-   * Foreman.run() has even finished executing. This might not happen if the results are larger.
-   * This brings up the question of how we detect failed queries, because here a failure is happening
-   * after the query starts running, yet apparently the query still succeeds.
-   *
    * TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way
    * that it gates input via stateListener. That could be tricky, since some results could be
    * queued up before Foreman has gotten through it's run(), and they would all have to be sent
@@ -389,8 +382,234 @@ public class TestDrillbitResilience extends ExecTest {
    */
   @SuppressWarnings("static-method")
   @Test
-  @Ignore
-  public void testForeman_runTryEnd() throws Exception {
+  public void foreman_runTryEnd() {
     testForeman("run-try-end");
   }
+
+  private static class WaitUntilCompleteListener implements UserResultsListener {
+    protected final CountDownLatch latch;
+    protected QueryId queryId = null;
+    protected Exception ex = null;
+    protected QueryState state = null;
+
+    public WaitUntilCompleteListener(final int count) {
+      latch = new CountDownLatch(count);
+    }
+
+    @Override
+    public void queryIdArrived(final QueryId queryId) {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void submissionFailed(final UserException ex) {
+      this.ex = ex;
+      state = QueryState.FAILED;
+      latch.countDown();
+    }
+
+    @Override
+    public void queryCompleted(final QueryState state) {
+      this.state = state;
+      latch.countDown();
+    }
+
+    @Override
+    public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+      result.release();
+    }
+
+    public final Pair<QueryState, Exception> waitForCompletion() {
+      try {
+        latch.await();
+      } catch (final InterruptedException e) {
+        return new Pair<QueryState, Exception>(state, e);
+      }
+      return new Pair<>(state, ex);
+    }
+  }
+
+  private static class CancellingThread extends Thread {
+
+    private final QueryId queryId;
+
+    public CancellingThread(final QueryId queryId) {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void run() {
+      final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId);
+      try {
+        cancelAck.checkedGet();
+      } catch (final RpcException e) {
+        fail(e.getMessage()); // currently this failure does not fail the test
+      }
+    }
+  }
+
+  /**
+   * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
+   */
+  private static void assertCancelled(final String controls, final WaitUntilCompleteListener listener) {
+    ControlsInjectionUtil.setControls(drillClient, controls);
+
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertTrue(result.getFirst() == QueryState.CANCELED);
+    assertTrue(result.getSecond() == null);
+  }
+
+  @Test // Cancellation TC 1
+  public void cancelBeforeAnyResultsArrive() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+
+      @Override
+      public void queryIdArrived(final QueryId queryId) {
+        (new CancellingThread(queryId)).start();
+      }
+    };
+
+    final String controls = "{\"injections\":[{"
+      + "\"type\":\"pause\"," +
+      "\"siteClass\":\"" + Foreman.class.getName() + "\","
+      + "\"desc\":\"pause-run-plan\","
+      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1"
+      + "}]}";
+
+    assertCancelled(controls, listener);
+  }
+
+  @Test // Cancellation TC 2
+  public void cancelInMiddleOfFetchingResults() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+      private boolean cancelRequested = false;
+
+      @Override
+      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+        if (! cancelRequested) {
+          assertTrue(queryId != null);
+          (new CancellingThread(queryId)).start();
+          cancelRequested = true;
+        }
+        result.release();
+      }
+    };
+
+    final String controls = "{\"injections\":[{"
+      + "\"type\":\"pause\"," +
+      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
+      + "\"desc\":\"sending-data\","
+      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1"
+      + "}]}";
+
+    assertCancelled(controls, listener);
+  }
+
+
+  @Test // Cancellation TC 3
+  public void cancelAfterAllResultsProduced() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+      private int count = 0;
+
+      @Override
+      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+        if (++count == drillbits.size()) {
+          assertTrue(queryId != null);
+          (new CancellingThread(queryId)).start();
+        }
+        result.release();
+      }
+    };
+
+    final String controls = "{\"injections\":[{"
+      + "\"type\":\"pause\"," +
+      "\"siteClass\":\"" + ScreenCreator.class.getName() + "\","
+      + "\"desc\":\"send-complete\","
+      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1"
+      + "}]}";
+
+    assertCancelled(controls, listener);
+  }
+
+  @Test // Cancellation TC 4
+  public void cancelAfterEverythingIsCompleted() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1) {
+      private int count = 0;
+
+      @Override
+      public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+        if (++count == drillbits.size()) {
+          assertTrue(queryId != null);
+          (new CancellingThread(queryId)).start();
+        }
+        result.release();
+      }
+    };
+
+    final String controls = "{\"injections\":[{"
+      + "\"type\":\"pause\"," +
+      "\"siteClass\":\"" + Foreman.class.getName() + "\","
+      + "\"desc\":\"foreman-cleanup\","
+      + "\"millis\":" + PAUSE_TIME_MILLIS + ","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1"
+      + "}]}";
+
+    assertCancelled(controls, listener);
+  }
+
+  @Test // Completion TC 1
+  public void successfullyCompletes() {
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
+    QueryTestUtil.testWithListener(
+      drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertTrue(result.getFirst() == QueryState.COMPLETED);
+    assertTrue(result.getSecond() == null);
+  }
+
+  /**
+   * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
+   */
+  private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+                                               final String exceptionDesc) {
+    setExceptions(controls);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(1);
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL,  TEST_QUERY, listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    assertTrue(result.getFirst() == QueryState.FAILED);
+    final Exception e = result.getSecond();
+    assertExceptionInjected(e, exceptionClass, exceptionDesc);
+  }
+
+  @Test // Completion TC 2
+  public void failsWhenParsing() {
+    final String exceptionDesc = "sql-parsing";
+    final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
+    final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+  }
+
+  @Test // Completion TC 3
+  public void failsWhenSendingFragments() {
+    final String exceptionDesc = "send-fragments";
+    final Class<? extends Throwable> exceptionClass = ForemanException.class;
+    final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+  }
+
+  @Test // Completion TC 4
+  public void failsDuringExecution() {
+    final String exceptionDesc = "fragment-execution";
+    final Class<? extends Throwable> exceptionClass = IOException.class;
+    final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 3a794a9..0e80f91 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
@@ -67,7 +68,7 @@ public class ParquetResultListener implements UserResultsListener {
   }
 
   @Override
-  public void queryCompleted() {
+  public void queryCompleted(QueryState state) {
     checkLastChunk();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
index cfe52c2..e1b03d5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -96,7 +97,7 @@ public class TestParquetPhysicalPlan extends ExecTest {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       latch.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
new file mode 100644
index 0000000..346c6dd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.testing.ExecutionControls.Controls;
+
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Static methods for constructing exception and pause injections for testing purposes.
+ */
+public class ControlsInjectionUtil {
+  /**
+   * Constructor. Prevent instantiation of static utility class.
+   */
+  private ControlsInjectionUtil() {
+  }
+
+  private static final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
+    @Override
+    public void increment(final UserSession session) {
+      session.incrementQueryCount(this);
+    }
+  };
+
+  public static void setControls(final DrillClient drillClient, final String controls) {
+    validateControlsString(controls);
+    try {
+      final List<QueryDataBatch> results = drillClient.runQuery(
+        UserBitShared.QueryType.SQL, String.format("alter session set `%s` = '%s'",
+          ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls));
+      for (final QueryDataBatch data : results) {
+        data.release();
+      }
+    } catch (RpcException e) {
+      fail("Could not set controls options: " + e.toString());
+    }
+  }
+
+  public static void setControls(final UserSession session, final String controls) {
+    validateControlsString(controls);
+    final OptionValue opValue = OptionValue.createString(OptionValue.OptionType.SESSION,
+      ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
+
+    final OptionManager options = session.getOptions();
+    try {
+      options.getAdmin().validate(opValue);
+      options.setOption(opValue);
+    } catch (Exception e) {
+      fail("Could not set controls options: " + e.getMessage());
+    }
+    incrementer.increment(session); // to simulate that a query completed
+  }
+
+  private static void validateControlsString(final String controls) {
+    try {
+      ExecutionControls.controlsOptionMapper.readValue(controls, Controls.class);
+    } catch (Exception e) {
+      fail("Could not validate controls JSON: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Clears all the controls.
+   */
+  public static void clearControls(final DrillClient client) {
+    setControls(client, ExecutionControls.DEFAULT_CONTROLS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
deleted file mode 100644
index bf93dee..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Static methods for constructing exception injections for testing purposes.
- */
-public class ExceptionInjectionUtil {
-  /**
-   * Constructor. Prevent instantiation of static utility class.
-   */
-  private ExceptionInjectionUtil() {
-  }
-
-  /**
-   * Add a set of injections to a drillbit.
-   *
-   * @param drillbit the drillbit
-   * @param injections the JSON-specified injections
-   */
-  public static void setInjections(final Drillbit drillbit, final String injections) {
-    final DrillbitContext drillbitContext = drillbit.getContext();
-    final OptionValue stringValue = OptionValue.createString(
-        OptionValue.OptionType.SYSTEM, ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, injections);
-    final OptionManager optionManager = drillbitContext.getOptionManager();
-    optionManager.setOption(stringValue);
-  }
-
-  /**
-   * Add a set of injections to a drillbit.
-   *
-   * @param drillbit the drillbit
-   * @param injectionOptions the injections, specified using the parsing POJOs
-   */
-  public static void setInjections(final Drillbit drillbit, final InjectionOptions injectionOptions) {
-    final ObjectMapper objectMapper = new ObjectMapper();
-    final StringWriter stringWriter = new StringWriter();
-    try {
-      objectMapper.writeValue(stringWriter, injectionOptions);
-    } catch(IOException e) {
-      throw new RuntimeException("Couldn't serialize injectionOptions to JSON", e);
-    }
-
-    setInjections(drillbit, stringWriter.toString());
-  }
-
-  /**
-   * Clear all injections on a drillbit.
-   *
-   * @param drillbit the drillbit
-   */
-  public static void clearInjections(final Drillbit drillbit) {
-    setInjections(drillbit, "");
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index d0c0279..2cba992 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -17,31 +17,41 @@
  */
 package org.apache.drill.exec.testing;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
-import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
+import org.apache.drill.exec.server.RemoteServiceSet;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class TestExceptionInjection extends BaseTestQuery {
-  private final static String NO_THROW_FAIL = "Didn't throw expected exception";
+  private static final String NO_THROW_FAIL = "Didn't throw expected exception";
+
+  private static final UserSession session = UserSession.Builder.newBuilder()
+    .withOptionManager(bits[0].getContext().getOptionManager())
+    .build();
 
   /**
-   * Class whose methods we want to simulate exceptions at run-time for testing
-   * purposes.
+   * Class whose methods we want to simulate runtime at run-time for testing
+   * purposes. The class must have access to QueryId, UserSession and DrillbitEndpoint.
+   * For instance, these are accessible from {@link org.apache.drill.exec.ops.QueryContext}.
    */
-  public static class DummyClass {
-    private final static ExceptionInjector injector = ExceptionInjector.getInjector(DummyClass.class);
-    private final DrillbitContext drillbitContext;
+  private static class DummyClass {
+    private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+    private final QueryContext context;
 
-    public DummyClass(final DrillbitContext drillbitContext) {
-      this.drillbitContext = drillbitContext;
+    public DummyClass(final QueryContext context) {
+      this.context = context;
     }
 
     /**
@@ -53,7 +63,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       // ... code ...
 
       // simulated unchecked exception
-      injector.injectUnchecked(drillbitContext, desc);
+      injector.injectUnchecked(context.getExecutionControls(), desc);
 
       // ... code ...
     }
@@ -69,7 +79,7 @@ public class TestExceptionInjection extends BaseTestQuery {
       // ... code ...
 
       // simulated IOException
-      injector.injectChecked(drillbitContext, THROWS_IOEXCEPTION, IOException.class);
+      injector.injectChecked(context.getExecutionControls(), THROWS_IOEXCEPTION, IOException.class);
 
       // ... code ...
     }
@@ -77,36 +87,30 @@ public class TestExceptionInjection extends BaseTestQuery {
 
   @SuppressWarnings("static-method")
   @Test
-  public void testNoInjection() throws Exception {
-    test("select * from sys.drillbits");
-  }
-
-  private static void setInjections(final String jsonInjections) {
-    for(Drillbit bit : bits) {
-      ExceptionInjectionUtil.setInjections(bit, jsonInjections);
-    }
+  public void noInjection() throws Exception {
+    test("select * from sys.memory");
   }
 
   @SuppressWarnings("static-method")
   @Test
-  public void testEmptyInjection() throws Exception {
-    setInjections("{\"injections\":[]}");
-    test("select * from sys.drillbits");
+  public void emptyInjection() throws Exception {
+    ControlsInjectionUtil.setControls(session, "{\"injections\":[]}");
+    test("select * from sys.memory");
   }
 
   /**
    * Assert that DummyClass.descPassThroughMethod does indeed throw the expected exception.
    *
-   * @param dummyClass the instance of DummyClass
+   * @param dummyClass         the instance of DummyClass
    * @param exceptionClassName the expected exception
-   * @param exceptionDesc the expected exception site description
+   * @param exceptionDesc      the expected exception site description
    */
   private static void assertPassthroughThrows(
-      final DummyClass dummyClass, final String exceptionClassName, final String exceptionDesc) {
+    final DummyClass dummyClass, final String exceptionClassName, final String exceptionDesc) {
     try {
       dummyClass.descPassthroughMethod(exceptionDesc);
       fail(NO_THROW_FAIL);
-    } catch(Exception e) {
+    } catch (Exception e) {
       assertEquals(exceptionClassName, e.getClass().getName());
       assertEquals(exceptionDesc, e.getMessage());
     }
@@ -114,79 +118,192 @@ public class TestExceptionInjection extends BaseTestQuery {
 
   @SuppressWarnings("static-method")
   @Test
-  public void testUncheckedStringInjection() {
-    // set injections via a string
+  public void uncheckedInjection() {
+    // set exceptions via a string
     final String exceptionDesc = "<<injected from descPassthroughMethod()>>";
     final String exceptionClassName = "java.lang.RuntimeException";
     final String jsonString = "{\"injections\":[{"
-        + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
-        + "\"desc\":\"" + exceptionDesc + "\","
-        + "\"nSkip\":0,"
-        + "\"nFire\":1,"
-        + "\"exceptionClass\":\"" + exceptionClassName + "\""
-        + "}]}";
-    setInjections(jsonString);
+      + "\"type\":\"exception\"," +
+      "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+      + "\"desc\":\"" + exceptionDesc + "\","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1,"
+      + "\"exceptionClass\":\"" + exceptionClassName + "\""
+      + "}]}";
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    final QueryContext context = new QueryContext(session, bits[0].getContext());
 
     // test that the exception gets thrown
-    final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+    final DummyClass dummyClass = new DummyClass(context);
     assertPassthroughThrows(dummyClass, exceptionClassName, exceptionDesc);
+    try {
+      context.close();
+    } catch (Exception e) {
+      fail();
+    }
   }
 
-  private static InjectionOptions buildDefaultJson() {
-    final InjectionOption injectionOption = new InjectionOption();
-    injectionOption.siteClass = "org.apache.drill.exec.testing.TestExceptionInjection$DummyClass";
-    injectionOption.desc = DummyClass.THROWS_IOEXCEPTION;
-    injectionOption.nSkip = 0;
-    injectionOption.nFire = 1;
-    injectionOption.exceptionClass = "java.io.IOException";
-    final InjectionOptions injectionOptions = new InjectionOptions();
-    injectionOptions.injections = new InjectionOption[1];
-    injectionOptions.injections[0] = injectionOption;
-    return injectionOptions;
+  private static String createException(final String desc, final int nSkip, final int nFire,
+                                        final String exceptionClass) {
+    return "{\"injections\":[{"
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\": " + nSkip + ","
+      + "\"nFire\": " + nFire + ","
+      + "\"exceptionClass\":\"" + exceptionClass + "\""
+      + "}]}";
+  }
+
+  private static String createExceptionOnBit(final DrillbitEndpoint endpoint, final String desc, final int nSkip,
+                                             final int nFire, final String exceptionClass) {
+    return "{\"injections\":[{"
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\","
+      + "\"type\":\"exception\","
+      + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\": " + nSkip + ","
+      + "\"nFire\": " + nFire + ","
+      + "\"exceptionClass\":\"" + exceptionClass + "\""
+      + "}]}";
   }
 
   @SuppressWarnings("static-method")
   @Test
-  public void testCheckedJsonInjection() {
+  public void checkedInjection() {
     // set the injection via the parsing POJOs
-    final InjectionOptions injectionOptions = buildDefaultJson();
-    ExceptionInjectionUtil.setInjections(bits[0], injectionOptions);
+    final String controls = createException(DummyClass.THROWS_IOEXCEPTION, 0, 1, IOException.class.getName());
+    ControlsInjectionUtil.setControls(session, controls);
+
+    final QueryContext context = new QueryContext(session, bits[0].getContext());
 
     // test that the expected exception (checked) gets thrown
-    final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+    final DummyClass dummyClass = new DummyClass(context);
     try {
       dummyClass.throwsIOException();
       fail(NO_THROW_FAIL);
-    } catch(IOException e) {
+    } catch (IOException e) {
       assertEquals(DummyClass.THROWS_IOEXCEPTION, e.getMessage());
     }
+    try {
+      context.close();
+    } catch (Exception e) {
+      fail();
+    }
   }
 
   @SuppressWarnings("static-method")
   @Test
-  public void testSkipAndLimit() {
+  public void skipAndLimit() {
     final String passthroughDesc = "<<injected from descPassthrough>>";
-    final InjectionOptions injectionOptions = buildDefaultJson();
-    final InjectionOption injectionOption = injectionOptions.injections[0];
-    injectionOption.desc = passthroughDesc;
-    injectionOption.nSkip = 7;
-    injectionOption.nFire = 3;
-    injectionOption.exceptionClass = RuntimeException.class.getName();
-    ExceptionInjectionUtil.setInjections(bits[0], injectionOptions);
+    final int nSkip = 7;
+    final int nFire = 3;
+    final String exceptionClass = RuntimeException.class.getName();
+    final String controls = createException(passthroughDesc, nSkip, nFire, exceptionClass);
+    ControlsInjectionUtil.setControls(session, controls);
+
+    final QueryContext context = new QueryContext(session, bits[0].getContext());
 
-    final DummyClass dummyClass = new DummyClass(bits[0].getContext());
+    final DummyClass dummyClass = new DummyClass(context);
 
     // these shouldn't throw
-    for(int i = 0; i < injectionOption.nSkip; ++i) {
+    for (int i = 0; i < nSkip; ++i) {
       dummyClass.descPassthroughMethod(passthroughDesc);
     }
 
     // these should throw
-    for(int i = 0; i < injectionOption.nFire; ++i) {
-      assertPassthroughThrows(dummyClass, injectionOption.exceptionClass, passthroughDesc);
+    for (int i = 0; i < nFire; ++i) {
+      assertPassthroughThrows(dummyClass, exceptionClass, passthroughDesc);
     }
 
     // this shouldn't throw
     dummyClass.descPassthroughMethod(passthroughDesc);
+    try {
+      context.close();
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @SuppressWarnings("static-method")
+  @Test
+  public void injectionOnSpecificBit() {
+    final RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
+    final ZookeeperHelper zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
+
+    // Creating two drillbits
+    final Drillbit drillbit1, drillbit2;
+    final DrillConfig drillConfig = zkHelper.getConfig();
+    try {
+      drillbit1 = Drillbit.start(drillConfig, remoteServiceSet);
+      drillbit2 = Drillbit.start(drillConfig, remoteServiceSet);
+    } catch (DrillbitStartupException e) {
+      throw new RuntimeException("Failed to start drillbits.", e);
+    }
+
+    final DrillbitContext drillbitContext1 = drillbit1.getContext();
+    final DrillbitContext drillbitContext2 = drillbit2.getContext();
+
+    final UserSession session = UserSession.Builder.newBuilder()
+      .withOptionManager(drillbitContext1.getOptionManager())
+      .build();
+
+    final String passthroughDesc = "<<injected from descPassthrough>>";
+    final int nSkip = 7;
+    final int nFire = 3;
+    final String exceptionClass = RuntimeException.class.getName();
+    // only drillbit1's (address, port)
+    final String controls = createExceptionOnBit(drillbitContext1.getEndpoint(), passthroughDesc, nSkip, nFire,
+      exceptionClass);
+
+    ControlsInjectionUtil.setControls(session, controls);
+
+    {
+      final QueryContext queryContext1 = new QueryContext(session, drillbitContext1);
+      final DummyClass class1 = new DummyClass(queryContext1);
+
+      // these shouldn't throw
+      for (int i = 0; i < nSkip; ++i) {
+        class1.descPassthroughMethod(passthroughDesc);
+      }
+
+      // these should throw
+      for (int i = 0; i < nFire; ++i) {
+        assertPassthroughThrows(class1, exceptionClass, passthroughDesc);
+      }
+
+      // this shouldn't throw
+      class1.descPassthroughMethod(passthroughDesc);
+      try {
+        queryContext1.close();
+      } catch (Exception e) {
+        fail();
+      }
+    }
+    {
+      final QueryContext queryContext2 = new QueryContext(session, drillbitContext2);
+      final DummyClass class2 = new DummyClass(queryContext2);
+
+      // these shouldn't throw
+      for (int i = 0; i < nSkip; ++i) {
+        class2.descPassthroughMethod(passthroughDesc);
+      }
+
+      // these shouldn't throw
+      for (int i = 0; i < nFire; ++i) {
+        class2.descPassthroughMethod(passthroughDesc);
+      }
+
+      // this shouldn't throw
+      class2.descPassthroughMethod(passthroughDesc);
+      try {
+        queryContext2.close();
+      } catch (Exception e) {
+        fail();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
new file mode 100644
index 0000000..1c219f0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestPauseInjection extends BaseTestQuery {
+
+  private static final UserSession session = UserSession.Builder.newBuilder()
+    .withOptionManager(bits[0].getContext().getOptionManager())
+    .build();
+
+  /**
+   * Class whose methods we want to simulate pauses at run-time for testing
+   * purposes. The class must have access to {@link org.apache.drill.exec.ops.QueryContext} or
+   * {@link org.apache.drill.exec.ops.FragmentContext}.
+   */
+  private static class DummyClass {
+    private static final Logger logger = org.slf4j.LoggerFactory.getLogger(DummyClass.class);
+    private static final ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DummyClass.class);
+
+    private final QueryContext context;
+
+    public DummyClass(final QueryContext context) {
+      this.context = context;
+    }
+
+    public static final String PAUSES = "<<pauses>>";
+
+    /**
+     * Method that pauses.
+     *
+     * @return how long the method paused in milliseconds
+     */
+    public long pauses() {
+      // ... code ...
+
+      final long startTime = System.currentTimeMillis();
+      // simulated pause
+      injector.injectPause(context.getExecutionControls(), PAUSES, logger);
+      final long endTime = System.currentTimeMillis();
+
+      // ... code ...
+      return (endTime - startTime);
+    }
+  }
+
+  @Test
+  public void pauseInjected() {
+    final long pauseMillis = 1000L;
+    final String jsonString = "{\"injections\":[{"
+      + "\"type\":\"pause\"," +
+      "\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
+      + "\"desc\":\"" + DummyClass.PAUSES + "\","
+      + "\"millis\":" + pauseMillis + ","
+      + "\"nSkip\":0,"
+      + "\"nFire\":1"
+      + "}]}";
+
+    ControlsInjectionUtil.setControls(session, jsonString);
+
+    final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
+
+    // test that the pause happens
+    final DummyClass dummyClass = new DummyClass(queryContext);
+    final long time = dummyClass.pauses();
+    assertTrue((time >= pauseMillis));
+    try {
+      queryContext.close();
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index d2302fb..484a5e5 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -32,6 +32,7 @@ import net.hydromatic.avatica.AvaticaStatement;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RecordBatchLoader;
@@ -191,7 +192,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       releaseIfFirst();
       completed = true;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/protocol/readme.txt
----------------------------------------------------------------------
diff --git a/protocol/readme.txt b/protocol/readme.txt
index bd516d3..6f502c4 100644
--- a/protocol/readme.txt
+++ b/protocol/readme.txt
@@ -8,6 +8,6 @@ To regenerate the sources after making changes to .proto files
 in your PATH (you may need to download and build it first). You can 
 download it from http://code.google.com/p/protobuf/downloads/list.
 
-2. Run "mvn process-sources -P proto-compile".
+2. In protocol dir, run "mvn process-sources -P proto-compile" or "mvn clean install -P proto-compile".
 
 3. Check in the new/updated files.
\ No newline at end of file


[2/2] drill git commit: DRILL-2383: Support to inject exceptions and pauses in various components of Drill + Controls are fired only if assertions are enabled + Controls can be introduced in any class that has access to FragmentContext/QueryContext + Con

Posted by pa...@apache.org.
DRILL-2383: Support to inject exceptions and pauses in various components of Drill
+ Controls are fired only if assertions are enabled
+ Controls can be introduced in any class that has access to FragmentContext/QueryContext
+ Controls can be fired by altering the DRILLBIT_CONTROL_INJECTIONS session option
+ Renames: SimulatedExceptions => ExecutionControls, ExceptionInjector => ExecutionControlsInjector
+ Added injection sites in Foreman, DrillSqlWorker, FragmentExecutor
+ Unit tests in TestDrillbitResilience, TestExceptionInjection and TestPauseInjection

Other commits included:
+ DRILL-2437: Moved ExecutionControls from DrillbitContext to FragmentContext/QueryContext
+ DRILL-2382: Added address and port to Injection to specify drillbit
+ DRILL-2384: Added QueryState to SingleRowListener and assert that state is COMPLETED while testing

Other edits:
+ Support for short lived session options in SessionOptionManager (using TTL in OptionValidator)
+ Introduced query count in UserSession
+ Added QueryState to queryCompleted() in UserResultsListener to check if COMPLETED/CANCELED
+ Added JSONStringValidator to TypeValidators
+ Log query id as string in DrillClient, WorkEventBus, QueryResultHandler
+ Use try..catch block only around else clause for OptionList in FragmentContext
+ Fixed drillbitContext spelling error in QueryContext
+ Fixed state transition when cancel() before run() in FragmentExecutor
+ Do not call setLocalOption twice in FallbackOptionManager
+ Show explicitly that submitWork() returns queryId in UserServer
+ Updated protocol/readme.txt to include an alternative way to generate sources


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be8d9539
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be8d9539
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be8d9539

Branch: refs/heads/master
Commit: be8d953935461ee6567b0c4d96c503e8b04469d2
Parents: 21dfe7a
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri Apr 17 09:46:11 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Mon Apr 20 19:58:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   7 +-
 .../apache/drill/exec/client/DrillClient.java   |   7 +-
 .../exec/client/PrintingResultsListener.java    |   3 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  26 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  14 +-
 .../drill/exec/physical/impl/ScreenCreator.java |   4 +
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   6 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |  20 +-
 .../drill/exec/rpc/user/QueryResultHandler.java |  35 +-
 .../exec/rpc/user/UserResultsListener.java      |   4 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |  10 +-
 .../apache/drill/exec/rpc/user/UserSession.java |  26 +-
 .../drill/exec/server/DrillbitContext.java      |   5 -
 .../server/options/FallbackOptionManager.java   |   2 -
 .../exec/server/options/OptionManager.java      |   1 +
 .../exec/server/options/OptionValidator.java    |  26 ++
 .../server/options/SessionOptionManager.java    |  75 ++-
 .../server/options/SystemOptionManager.java     |   7 +-
 .../exec/server/options/TypeValidators.java     |  31 ++
 .../drill/exec/server/rest/QueryWrapper.java    |   3 +-
 .../drill/exec/testing/ExceptionInjection.java  |  84 ++--
 .../drill/exec/testing/ExceptionInjector.java   | 112 -----
 .../drill/exec/testing/ExecutionControls.java   | 193 ++++++++
 .../exec/testing/ExecutionControlsInjector.java | 129 ++++++
 .../apache/drill/exec/testing/Injection.java    |  84 ++++
 .../InjectionConfigurationException.java        |  35 ++
 .../drill/exec/testing/InjectionSite.java       |  40 +-
 .../exec/testing/NoOpControlsInjector.java      |  48 ++
 .../drill/exec/testing/PauseInjection.java      |  63 +++
 .../drill/exec/testing/SimulatedExceptions.java | 164 -------
 .../apache/drill/exec/work/foreman/Foreman.java |  14 +-
 .../exec/work/fragment/FragmentExecutor.java    |  55 ++-
 .../apache/drill/exec/work/user/UserWorker.java |   9 +
 .../java/org/apache/drill/BaseTestQuery.java    |   3 +-
 .../java/org/apache/drill/PlanningBase.java     |   8 +-
 .../org/apache/drill/SingleRowListener.java     |   3 +-
 .../exec/compile/TestClassTransformation.java   |   6 +-
 .../exec/server/TestDrillbitResilience.java     | 461 ++++++++++++++-----
 .../store/parquet/ParquetResultListener.java    |   3 +-
 .../store/parquet/TestParquetPhysicalPlan.java  |   3 +-
 .../exec/testing/ControlsInjectionUtil.java     |  95 ++++
 .../exec/testing/ExceptionInjectionUtil.java    |  82 ----
 .../exec/testing/TestExceptionInjection.java    | 257 ++++++++---
 .../drill/exec/testing/TestPauseInjection.java  |  96 ++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     |   3 +-
 protocol/readme.txt                             |   2 +-
 46 files changed, 1682 insertions(+), 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7d89ac9..f7648b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 public interface ExecConstants {
   public static final String ZK_RETRY_TIMES = "drill.exec.zk.retry.count";
@@ -216,7 +217,7 @@ public interface ExecConstants {
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
   public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
 
-  public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
-  public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
-      new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
+  public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
+  public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
+    new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 336a149..0d29f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
@@ -326,7 +327,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       future.set(results);
     }
 
@@ -352,7 +353,9 @@ public class DrillClient implements Closeable, ConnectionThrottle {
 
     @Override
     public void queryIdArrived(QueryId queryId) {
-      logger.debug( "Query ID arrived: {}", queryId );
+      if (logger.isDebugEnabled()) {
+        logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId));
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 2bf35b1..64e7266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -60,7 +61,7 @@ public class PrintingResultsListener implements UserResultsListener {
   }
 
   @Override
-  public void queryCompleted() {
+  public void queryCompleted(QueryState state) {
     allocator.close();
     latch.countDown();
     System.out.println("Total rows returned: " + count.get());

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index c46613d..9400355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +64,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
-  private final UserClientConnection connection;
+  private final UserClientConnection connection; // is null if attached to non-root fragment
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
   private final BufferAllocator allocator;
@@ -73,6 +74,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private final OptionManager fragmentOptions;
   private final BufferManager bufferManager;
   private ExecutorState executorState;
+  private final ExecutionControls executionControls;
 
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -98,17 +100,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
 
-    try {
-      final OptionList list;
-      if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
-        list = new OptionList();
-      } else {
+    final OptionList list;
+    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
+      list = new OptionList();
+    } else {
+      try {
         list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+      } catch (final Exception e) {
+        throw new ExecutionSetupException("Failure while reading plan options.", e);
       }
-      fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
-    } catch (final Exception e) {
-      throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
+    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+
+    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
 
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
@@ -288,6 +292,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     allocator.setFragmentLimit(limit);
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   @Override
   public void close() {
     waitForSendComplete();

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2fa0b18..2dcac25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.store.PartitionExplorerImpl;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 // TODO except for a couple of tests, this is only created by Foreman
 // TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -52,6 +53,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   private final OptionManager queryOptions;
   private final PlannerSettings plannerSettings;
   private final DrillOperatorTable table;
+  private final ExecutionControls executionControls;
 
   private final BufferAllocator allocator;
   private final BufferManager bufferManager;
@@ -65,10 +67,11 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
    */
   private boolean closed = false;
 
-  public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
-    this.drillbitContext = drllbitContext;
+  public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
+    this.drillbitContext = drillbitContext;
     this.session = session;
     queryOptions = new QueryOptionManager(session.getOptions());
+    executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
     plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
     plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
     table = new DrillOperatorTable(getFunctionRegistry());
@@ -78,7 +81,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
 
     try {
-      allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
+      allocator = drillbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
           MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
     } catch (OutOfMemoryException e) {
       throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
@@ -87,7 +90,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     bufferManager = new BufferManager(this.allocator, null);
   }
 
-
   public PlannerSettings getPlannerSettings() {
     return plannerSettings;
   }
@@ -120,6 +122,10 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return queryOptions;
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   public DrillbitEndpoint getCurrentEndpoint() {
     return drillbitContext.getEndpoint();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 6b3caf4..2069d35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -34,9 +34,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class ScreenCreator implements RootCreator<Screen>{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ScreenCreator.class);
 
 
 
@@ -107,6 +109,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         materializer = new VectorRecordMaterializer(context, incoming);
         //$FALL-THROUGH$
       case OK:
+        injector.injectPause(context.getExecutionControls(), "sending-data", logger);
         QueryWritableBatch batch = materializer.convertNext();
         updateStats(batch);
         stats.startWait();
@@ -139,6 +142,7 @@ public class ScreenCreator implements RootCreator<Screen>{
       if (!oContext.isClosed()) {
         internalStop();
       }
+      injector.injectPause(context.getExecutionControls(), "send-complete", logger);
     }
 
     RecordBatch getIncoming() {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 9ca64d8..097b7bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -45,7 +45,9 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
 import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
 import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.rules.ReduceExpressionsRule;
@@ -60,7 +62,8 @@ import org.eigenbase.sql.parser.SqlParseException;
 import org.eigenbase.sql.parser.SqlParser;
 
 public class DrillSqlWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DrillSqlWorker.class);
 
   private final Planner planner;
   private final HepPlanner hepPlanner;
@@ -119,6 +122,7 @@ public class DrillSqlWorker {
   public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException {
     SqlNode sqlNode;
     try {
+      injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
       sqlNode = planner.parse(sql);
     } catch (SqlParseException e) {
       throw new QueryInputException("Failure parsing SQL. " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index a5a5441..d90096a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -45,13 +45,17 @@ public class WorkEventBus {
           .build();
 
   public void removeFragmentStatusListener(final QueryId queryId) {
-    logger.debug("Removing fragment status listener for queryId {}.", queryId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+    }
     listeners.remove(queryId);
   }
 
   public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener)
       throws ForemanSetupException {
-    logger.debug("Adding fragment status listener for queryId {}.", queryId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Adding fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+    }
     final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
     if (old != null) {
       throw new ForemanSetupException (
@@ -69,7 +73,9 @@ public class WorkEventBus {
   }
 
   public void addFragmentManager(final FragmentManager fragmentManager) {
-    logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+    if (logger.isDebugEnabled()) {
+      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+    }
     final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
       if (old != null) {
         throw new IllegalStateException(
@@ -84,7 +90,9 @@ public class WorkEventBus {
   public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
     // check if this was a recently canceled fragment.  If so, throw away message.
     if (recentlyFinishedFragments.asMap().containsKey(handle)) {
-      logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+      }
       return null;
     }
 
@@ -98,7 +106,9 @@ public class WorkEventBus {
   }
 
   public void removeFragmentManager(final FragmentHandle handle) {
-    logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    }
     recentlyFinishedFragments.put(handle,  1);
     managers.remove(handle);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 3c807d5..3beae89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
@@ -114,7 +115,7 @@ public class QueryResultHandler {
         // A successful completion/canceled case--pass on via resultArrived
 
         try {
-          resultsListener.queryCompleted();
+          resultsListener.queryCompleted(queryState);
         } catch ( Exception e ) {
           resultsListener.submissionFailed(UserException.systemError(e).build());
         }
@@ -198,8 +199,8 @@ public class QueryResultHandler {
   private static class BufferingResultsListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
-    private volatile boolean finished = false;
     private volatile UserException ex;
+    private volatile QueryState queryState;
     private volatile UserResultsListener output;
     private volatile ConnectionThrottle throttle;
 
@@ -212,20 +213,22 @@ public class QueryResultHandler {
         if (ex != null) {
           l.submissionFailed(ex);
           return true;
-        } else if (finished) {
-          l.queryCompleted();
+        } else if (queryState != null) {
+          l.queryCompleted(queryState);
+          return true;
         }
 
-        return finished;
+        return false;
       }
     }
 
     @Override
-    public void queryCompleted() {
-      finished = true;
+    public void queryCompleted(QueryState state) {
+      assert queryState == null;
+      this.queryState = state;
       synchronized (this) {
         if (output != null) {
-          output.queryCompleted();
+          output.queryCompleted(state);
         }
       }
     }
@@ -245,7 +248,11 @@ public class QueryResultHandler {
 
     @Override
     public void submissionFailed(UserException ex) {
-      finished = true;
+      assert queryState == null;
+      // there is one case when submissionFailed() is called even though the query didn't fail on the server side
+      // it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
+      // submissionFailed() by QueryResultHandler.dataArrived()
+      queryState = QueryState.FAILED;
       synchronized (this) {
         if (output == null) {
           this.ex = ex;
@@ -255,10 +262,6 @@ public class QueryResultHandler {
       }
     }
 
-    public boolean isFinished() {
-      return finished;
-    }
-
     @Override
     public void queryIdArrived(QueryId queryId) {
     }
@@ -281,8 +284,10 @@ public class QueryResultHandler {
     @Override
     public void success(QueryId queryId, ByteBuf buf) {
       resultsListener.queryIdArrived(queryId);
-      logger.debug("Received QueryId {} successfully.  Adding results listener {}.",
-                   queryId, resultsListener);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Received QueryId {} successfully. Adding results listener {}.",
+          QueryIdHelper.getQueryId(queryId), resultsListener);
+      }
       UserResultsListener oldListener =
           queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index f928476..e422a3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.user;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 
 public interface UserResultsListener {
 
@@ -38,8 +39,9 @@ public interface UserResultsListener {
   /**
    * The query has completed (successsful completion or cancellation). The listener will not receive any other
    * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
+   * @param state
    */
-  void queryCompleted();
+  void queryCompleted(QueryState state);
 
   /**
    * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 877bc08..9e929de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -97,16 +97,18 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
     case RpcType.RUN_QUERY_VALUE:
       logger.debug("Received query to run.  Returning query handle.");
       try {
-        RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
-        return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
+        final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final QueryId queryId = worker.submitWork(connection, query);
+        return new Response(RpcType.QUERY_HANDLE, queryId);
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding RunQuery body.", e);
       }
 
     case RpcType.CANCEL_QUERY_VALUE:
       try {
-        QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
-        return new Response(RpcType.ACK, worker.cancelQuery(queryId));
+        final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final Ack ack = worker.cancelQuery(queryId);
+        return new Response(RpcType.ACK, ack);
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding QueryId body.", e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 19d77b0..e631792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.rpc.user;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import net.hydromatic.optiq.SchemaPlus;
 
@@ -42,6 +43,16 @@ public class UserSession {
   private UserCredentials credentials;
   private Map<String, String> properties;
   private OptionManager sessionOptions;
+  private final AtomicInteger queryCount;
+
+  /**
+   * Implementations of this interface are allowed to increment queryCount.
+   * {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface.
+   * No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil).
+   */
+  public static interface QueryCountIncrementer {
+    public void increment(final UserSession session);
+  }
 
   public static class Builder {
     UserSession userSession;
@@ -56,7 +67,7 @@ public class UserSession {
     }
 
     public Builder withOptionManager(OptionManager systemOptions) {
-      userSession.sessionOptions = new SessionOptionManager(systemOptions);
+      userSession.sessionOptions = new SessionOptionManager(systemOptions, userSession);
       return this;
     }
 
@@ -87,7 +98,9 @@ public class UserSession {
     }
   }
 
-  private UserSession() { }
+  private UserSession() {
+    queryCount = new AtomicInteger(0);
+  }
 
   public boolean isSupportComplexTypes() {
     return supportComplexTypes;
@@ -105,6 +118,15 @@ public class UserSession {
     return credentials;
   }
 
+  public void incrementQueryCount(final QueryCountIncrementer incrementer) {
+    assert incrementer != null;
+    queryCount.incrementAndGet();
+  }
+
+  public int getQueryCount() {
+    return queryCount.get();
+  }
+
   /**
    * Update the schema path for the session.
    * @param fullPath The desired path to set to.

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index dbf3c74..6fdbfca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.testing.SimulatedExceptions;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Preconditions;
@@ -59,7 +58,6 @@ public class DrillbitContext {
   private final PStoreProvider provider;
   private final CodeCompiler compiler;
   private final ExecutorService executor;
-  private final SimulatedExceptions simulatedExceptions = new SimulatedExceptions();
 
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord,
       Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider,
@@ -164,7 +162,4 @@ public class DrillbitContext {
     return executor;
   }
 
-  public SimulatedExceptions getSimulatedExceptions() {
-    return simulatedExceptions;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
index 4e90616..682bfea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -67,8 +67,6 @@ public abstract class FallbackOptionManager extends BaseOptionManager {
   private void setValidatedOption(OptionValue value) {
     if (!setLocalOption(value)) {
       fallback.setOption(value);
-    }else{
-      setLocalOption(value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
index 0b8811a..0fed1fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -39,6 +39,7 @@ public interface OptionManager extends Iterable<OptionValue> {
 
   public interface OptionAdmin {
     public void registerOptionType(OptionValidator validator);
+    public OptionValidator getValidator(String name);
     public void validate(OptionValue v) throws SetOptionException;
     public OptionValue validate(String name, SqlLiteral value, OptionType optionType) throws SetOptionException;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
index 43071e7..90ce3a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -53,6 +53,32 @@ public abstract class OptionValidator {
     return optionName;
   }
 
+  /**
+   * This function returns true if and only if the validator is meant for a short-lived option.
+   *
+   * NOTE: By default, options are not short-lived. So, if a derived class is meant for a short-lived option,
+   * that class must do two things:
+   * (1) override this method to return true, and
+   * (2) return the number of queries for which the option is valid through {@link #getTtl}.
+   * E.g. {@link org.apache.drill.exec.testing.ExecutionControls.ControlsOptionValidator}
+   * @return if this validator is for a short-lived option
+   */
+  public boolean isShortLived() {
+    return false;
+  }
+
+  /**
+   * If an option is short-lived, this returns the number of queries for which the option is valid.
+   * Please read the note at {@link #isShortLived}
+   * @return number of queries for which the option should be valid
+   */
+  public int getTtl() {
+    if (!isShortLived()) {
+      throw new UnsupportedOperationException("This option is not short-lived.");
+    }
+    return 0;
+  }
+
   public String getDefaultString() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index c3de190..340358f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,13 +17,86 @@
  */
 package org.apache.drill.exec.server.options;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class SessionOptionManager extends InMemoryOptionManager {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
 
-  public SessionOptionManager(OptionManager systemOptions) {
+  private final UserSession session;
+
+  /**
+   * Map of short lived options. Key: option name, Value: [ start, end )
+   */
+  private final ConcurrentHashMap<String, ImmutablePair<Integer, Integer>> shortLivedOptions = new ConcurrentHashMap<>();
+
+  public SessionOptionManager(final OptionManager systemOptions, final UserSession session) {
     super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
+    this.session = session;
+  }
+
+  @Override
+  boolean setLocalOption(final OptionValue value) {
+    final boolean set = super.setLocalOption(value);
+    final String name = value.name;
+    final OptionValidator validator = fallback.getAdmin().getValidator(name);
+    final boolean shortLived = validator.isShortLived();
+    if (set && shortLived) {
+      final int start = session.getQueryCount() + 1; // start from the next query
+      final int ttl = validator.getTtl();
+      final int end = start + ttl;
+      shortLivedOptions.put(name, new ImmutablePair<>(start, end));
+    }
+    return set;
+  }
+
+  @Override
+  OptionValue getLocalOption(final String name) {
+    final OptionValue value = options.get(name);
+    if (shortLivedOptions.containsKey(name)) {
+      if (withinRange(value)) {
+        return value;
+      }
+      final int queryNumber = session.getQueryCount();
+      final int start = shortLivedOptions.get(name).getLeft();
+      // option is not in effect if queryNumber < start
+      if (queryNumber < start) {
+        return fallback.getAdmin().getValidator(name).getDefault();
+      // reset if queryNumber <= end
+      } else {
+        options.remove(name);
+        shortLivedOptions.remove(name);
+        return null; // fallback takes effect
+      }
+    }
+    return value;
+  }
+
+  private boolean withinRange(final OptionValue value) {
+    final int queryNumber = session.getQueryCount();
+    final ImmutablePair<Integer, Integer> pair = shortLivedOptions.get(value.name);
+    final int start = pair.getLeft();
+    final int end = pair.getRight();
+    return start <= queryNumber && queryNumber < end;
+  }
+
+  private final Predicate<OptionValue> isLive = new Predicate<OptionValue>() {
+    @Override
+    public boolean apply(final OptionValue value) {
+      final String name = value.name;
+      return !shortLivedOptions.containsKey(name) || withinRange(value);
+    }
+  };
+
+  @Override
+  Iterable<OptionValue> optionIterable() {
+    final Collection<OptionValue> liveOptions = Collections2.filter(options.values(), isLive);
+    return liveOptions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4471d4f..a745479 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -101,7 +101,7 @@ public class SystemOptionManager extends BaseOptionManager {
       QueryClassLoader.JAVA_COMPILER_DEBUG,
       ExecConstants.ENABLE_VERBOSE_ERRORS,
       ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
-      ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR,
+      ExecConstants.DRILLBIT_CONTROLS_VALIDATOR,
       ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
   };
 
@@ -216,6 +216,11 @@ public class SystemOptionManager extends BaseOptionManager {
     }
 
     @Override
+    public OptionValidator getValidator(final String name) {
+      return knownOptions.get(name);
+    }
+
+    @Override
     public void validate(final OptionValue v) throws SetOptionException {
       final OptionValidator validator = knownOptions.get(v.name);
       if (validator == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index b9721cc..e7b1eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.server.options;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.common.exceptions.ExpressionParsingException;
 import org.apache.drill.exec.server.options.OptionValue.Kind;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
@@ -155,6 +157,35 @@ public class TypeValidators {
     }
   }
 
+  /**
+   * Validator for POJO passed in as JSON string
+   */
+  public static class JsonStringValidator extends StringValidator {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+    private final Class<?> clazz;
+
+    public JsonStringValidator(final String name, final Class<?> clazz, final String def) {
+      super(name, def);
+      this.clazz = clazz;
+      validateJson(def, clazz);
+    }
+
+    @Override
+    public void validate(final OptionValue v) throws ExpressionParsingException {
+      super.validate(v);
+      validateJson(v.string_val, clazz);
+    }
+
+    private static void validateJson(final String jsonString, final Class<?> clazz) {
+      try {
+        mapper.readValue(jsonString, clazz);
+      } catch (IOException e) {
+        throw new ExpressionParsingException("Invalid JSON string (" + jsonString + ") for class " + clazz.getName(), e);
+      }
+    }
+  }
+
   public static abstract class TypeValidator extends OptionValidator {
     private final Kind kind;
     private final OptionValue defaultValue;

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 62f5bdb..aa43aa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -136,7 +137,7 @@ public class QueryWrapper {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       latch.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
index 68cbf08..61f0d67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
@@ -17,38 +17,48 @@
  */
 package org.apache.drill.exec.testing;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Injection for a single exception. Specifies how many times to inject it, and how many times to skip
  * injecting it before the first injection. This class is used internally for tracking injected
  * exceptions; injected exceptions are specified via the
- * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_EXCEPTION_INJECTIONS} system option.
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
  */
-public class ExceptionInjection {
-  private final String desc; // description of the injection site
-
-  private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
-  private final AtomicInteger nThrow; // the number of times to do the injection, after any skips; starts > 0
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class ExceptionInjection extends Injection {
 
   private final Class<? extends Throwable> exceptionClass;
 
-  /**
-   * Constructor.
-   *
-   * @param desc description of the injection site; useful for multiple injections in a single class
-   * @param nSkip non-negative number of times to skip injecting the exception
-   * @param nFire positive number of times to inject the exception
-   * @param exceptionClass
-   */
-  public ExceptionInjection(final String desc, final int nSkip, final int nFire,
-      final Class<? extends Throwable> exceptionClass) {
-    this.desc = desc;
-    this.nSkip = new AtomicInteger(nSkip);
-    this.nThrow = new AtomicInteger(nFire);
-    this.exceptionClass = exceptionClass;
+  @JsonCreator // ensures instances are created only through JSON
+  private ExceptionInjection(@JsonProperty("address") final String address,
+                             @JsonProperty("port") final int port,
+                             @JsonProperty("siteClass") final String siteClass,
+                             @JsonProperty("desc") final String desc,
+                             @JsonProperty("nSkip") final int nSkip,
+                             @JsonProperty("nFire") final int nFire,
+                             @JsonProperty("exceptionClass") String classString) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, nFire);
+    final Class<?> clazz;
+    try {
+      clazz = Class.forName(classString);
+    } catch (ClassNotFoundException e) {
+      throw new InjectionConfigurationException("Injected exceptionClass not found.", e);
+    }
+
+    if (!Throwable.class.isAssignableFrom(clazz)) {
+      throw new InjectionConfigurationException("Injected exceptionClass is not a Throwable.");
+    }
+
+    @SuppressWarnings("unchecked")
+    final Class<? extends Throwable> exceptionClazz = (Class<? extends Throwable>) clazz;
+    this.exceptionClass = exceptionClazz;
   }
 
   /**
@@ -57,29 +67,25 @@ public class ExceptionInjection {
    * @return the exception to throw, or null if it isn't time to throw it
    */
   private Throwable constructException() {
-    final int remainingSkips = nSkip.decrementAndGet();
-    if (remainingSkips >= 0) {
-      return null;
-    }
-
-    final int remainingFirings = nThrow.decrementAndGet();
-    if (remainingFirings < 0) {
+    if (! injectNow()) {
       return null;
     }
 
     // if we get here, we should throw the specified exception
-    Constructor<?> constructor;
+    final Constructor<?> constructor;
     try {
       constructor = exceptionClass.getConstructor(String.class);
-    } catch(NoSuchMethodException e) {
-      throw new RuntimeException("No constructor found that takes a single String argument");
+    } catch (NoSuchMethodException e) {
+      // this should not throw; validated already.
+      throw new RuntimeException("No constructor found that takes a single String argument.");
     }
 
-    Throwable throwable;
+    final Throwable throwable;
     try {
-      throwable = (Throwable) constructor.newInstance(desc);
-    } catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      throw new IllegalStateException("Couldn't construct exception instance", e);
+      throwable = (Throwable) constructor.newInstance(getDesc());
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      // this should not throw; validated already.
+      throw new IllegalStateException("Couldn't construct exception instance.", e);
     }
 
     return throwable;
@@ -105,16 +111,16 @@ public class ExceptionInjection {
       throw e;
     }
 
-    throw new IllegalStateException("throwable was not an unchecked exception");
+    throw new IllegalStateException("Throwable was not an unchecked exception.");
   }
 
   /**
    * Throw the checked exception specified by this injection.
    *
    * @param exceptionClass the class of the exception to throw
-   * @throws T if it is time to throw the exception
+   * @throws T                     if it is time to throw the exception
    * @throws IllegalStateException if it is time to throw the exception, and the exception's class
-   *   is incompatible with the class specified by the injection
+   *                               is incompatible with the class specified by the injection
    */
   public <T extends Throwable> void throwChecked(final Class<T> exceptionClass) throws T {
     final Throwable throwable = constructException();
@@ -128,6 +134,6 @@ public class ExceptionInjection {
     }
 
     throw new IllegalStateException("Constructed Throwable(" + throwable.getClass().getName()
-        + ") is incompatible with exceptionClass("+ exceptionClass.getName() + ")");
+      + ") is incompatible with exceptionClass(" + exceptionClass.getName() + ")");
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
deleted file mode 100644
index 54bc351..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import org.apache.drill.exec.server.DrillbitContext;
-
-/**
- * Injects exceptions at execution time for testing. Any class that wants to simulate exceptions
- * for testing should have it's own private static instance of an injector (similar to the use
- * of loggers).
- *
- * <p>See {@link org.apache.drill.exec.testing.TestExceptionInjection} for examples of
- * use.
- */
-public class ExceptionInjector {
-  private final Class<?> clazz; // the class that owns this injector
-
-  /**
-   * Constructor. Classes should use the static {@link #getInjector()} method to obtain
-   * their injector.
-   *
-   * @param clazz the owning class
-   */
-  private ExceptionInjector(final Class<?> clazz) {
-    this.clazz = clazz;
-  }
-
-  /**
-   * Create an injector.
-   *
-   * @param clazz the owning class
-   * @return the newly created injector
-   */
-  public static ExceptionInjector getInjector(final Class<?> clazz) {
-    return new ExceptionInjector(clazz);
-  }
-
-  /**
-   * Get the injector's owning class.
-   *
-   * @return the injector's owning class
-   */
-  public Class<?> getSiteClass() {
-    return clazz;
-  }
-
-  /**
-   * Lookup an injection within this class that matches the site description.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * @return the injection, if there is one; null otherwise
-   */
-  private ExceptionInjection getInjection(final DrillbitContext drillbitContext, final String desc) {
-    final SimulatedExceptions simulatedExceptions = drillbitContext.getSimulatedExceptions();
-    final ExceptionInjection exceptionInjection = simulatedExceptions.lookupInjection(drillbitContext, this, desc);
-    return exceptionInjection;
-  }
-
-  /**
-   * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
-   * for it to be thrown.
-   *
-   * <p>Implementors use this in their code at a site where they want to simulate an exception
-   * during testing.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * throws the exception specified by the injection, if it is time
-   */
-  public void injectUnchecked(final DrillbitContext drillbitContext, final String desc) {
-    final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
-    if (exceptionInjection != null) {
-      exceptionInjection.throwUnchecked();
-    }
-  }
-
-  /**
-   * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
-   * for it to be thrown.
-   *
-   * <p>Implementors use this in their code at a site where they want to simulate an exception
-   * during testing.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * @param exceptionClass the expected class of the exception (or a super class of it)
-   * @throws T the exception specified by the injection, if it is time
-   */
-  public <T extends Throwable> void injectChecked(
-      final DrillbitContext drillbitContext, final String desc, final Class<T> exceptionClass) throws T {
-    final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
-    if (exceptionInjection != null) {
-      exceptionInjection.throwChecked(exceptionClass);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
new file mode 100644
index 0000000..1171bf8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.server.options.TypeValidators.TypeValidator;
+import org.apache.drill.exec.testing.InjectionSite.InjectionSiteKeyDeserializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tracks the simulated controls that will be injected for testing purposes.
+ */
+public final class ExecutionControls {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControls.class);
+
+  // used to map JSON specified injections to POJOs
+  public static final ObjectMapper controlsOptionMapper = new ObjectMapper();
+
+  static {
+    controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
+  }
+
+  // Jackson MixIn for all types of injections
+  @JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "type")
+  @JsonSubTypes({
+    @Type(value = ExceptionInjection.class, name = "exception"),
+    @Type(value = PauseInjection.class, name = "pause")})
+  public static abstract class InjectionMixIn {
+  }
+
+  /**
+   * The JSON specified for the {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS}
+   * option is validated using this class. Controls are short-lived options.
+   */
+  public static class ControlsOptionValidator extends TypeValidator {
+
+    private final int ttl; // the number of queries for which this option is valid
+
+    /**
+     * Constructor for controls option validator.
+     * @param name the name of the validator
+     * @param def  the default JSON, specified as string
+     * @param ttl  the number of queries for which this option should be valid
+     */
+    public ControlsOptionValidator(final String name, final String def, final int ttl) {
+      super(name, OptionValue.Kind.DOUBLE, OptionValue.createString(OptionType.SESSION, name, def));
+      assert ttl > 0;
+      this.ttl = ttl;
+    }
+
+    @Override
+    public int getTtl() {
+      return  ttl;
+    }
+
+    @Override
+    public boolean isShortLived() {
+      return true;
+    }
+
+    @Override
+    public void validate(final OptionValue v) throws ExpressionParsingException {
+      if (v.type != OptionType.SESSION) {
+        throw new ExpressionParsingException("Controls can be set only at SESSION level.");
+      }
+      final String jsonString = v.string_val;
+      try {
+        controlsOptionMapper.readValue(jsonString, Controls.class);
+      } catch (IOException e) {
+        throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
+      }
+    }
+  }
+
+  /**
+   * POJO used to parse JSON-specified controls.
+   */
+  public static class Controls {
+    public Collection<? extends Injection> injections;
+  }
+
+  /**
+   * The default value for controls.
+   */
+  public static final String DEFAULT_CONTROLS = "{}";
+
+  /**
+   * Caches the currently specified controls.
+   */
+  @JsonDeserialize(keyUsing = InjectionSiteKeyDeserializer.class)
+  private final Map<InjectionSite, Injection> controls = new HashMap<>();
+
+  private final DrillbitEndpoint endpoint; // the current endpoint
+
+  public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+
+    final OptionValue optionValue = options.getOption(ExecConstants.DRILLBIT_CONTROL_INJECTIONS);
+    if (optionValue == null) {
+      return;
+    }
+
+    final String opString = optionValue.string_val;
+    final Controls controls;
+    try {
+      controls = controlsOptionMapper.readValue(opString, Controls.class);
+    } catch (IOException e) {
+      // This never happens. opString must have been validated.
+      logger.warn("Could not parse injections. Injections must have been validated before this point.");
+      throw new DrillRuntimeException("Could not parse injections.", e);
+    }
+    if (controls.injections == null) {
+      return;
+    }
+
+    logger.debug("Adding control injections: \n{}", opString);
+    for (final Injection injection : controls.injections) {
+      this.controls.put(new InjectionSite(injection.getSiteClass(), injection.getDesc()), injection);
+    }
+  }
+
+  /**
+   * Look for an exception injection matching the given injector, site description and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the exception injection, if there is one for the injector, site and endpoint; null otherwise
+   */
+  public ExceptionInjection lookupExceptionInjection(final ExecutionControlsInjector injector, final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (ExceptionInjection) injection : null;
+  }
+
+  /**
+   * Look for an pause injection matching the given injector, site description and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the pause injection, if there is one for the injector, site and endpoint; null otherwise
+   */
+  public PauseInjection lookupPauseInjection(final ExecutionControlsInjector injector, final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (PauseInjection) injection : null;
+  }
+
+  private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
+    if (controls.isEmpty()) {
+      return null;
+    }
+
+    // lookup the request
+    final InjectionSite site = new InjectionSite(injector.getSiteClass(), desc);
+    final Injection injection = controls.get(site);
+    if (injection == null) {
+      return null;
+    }
+    // return only if injection was meant for this drillbit
+    return injection.isValidForBit(endpoint) ? injection : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
new file mode 100644
index 0000000..4b1cd0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.util.AssertionUtil;
+import org.slf4j.Logger;
+
+/**
+ * Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions
+ * or inject pauses for testing should have it's own private static instance of an injector (similar to the use
+ * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or
+ * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and
+ * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use.
+ */
+public class ExecutionControlsInjector {
+//  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControlsInjector.class);
+
+  private final Class<?> clazz; // the class that owns this injector
+
+  /**
+   * Constructor. Classes should use the static {@link #getInjector} method to obtain their injector.
+   *
+   * @param clazz the owning class
+   */
+  protected ExecutionControlsInjector(final Class<?> clazz) {
+    this.clazz = clazz;
+  }
+
+  /**
+   * Create an injector if assertions are enabled
+   *
+   * @param clazz the owning class
+   * @return the newly created injector
+   */
+  public static ExecutionControlsInjector getInjector(final Class<?> clazz) {
+    if (AssertionUtil.isAssertionsEnabled()) {
+      return new ExecutionControlsInjector(clazz);
+    } else {
+      return new NoOpControlsInjector(clazz);
+    }
+  }
+
+  /**
+   * Get the injector's owning class.
+   *
+   * @return the injector's owning class
+   */
+  public Class<?> getSiteClass() {
+    return clazz;
+  }
+
+  /**
+   * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
+   * for it to be thrown.
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate an exception
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   *                          throws the exception specified by the injection, if it is time
+   */
+  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+    final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+    if (exceptionInjection != null) {
+      exceptionInjection.throwUnchecked();
+    }
+    return this;
+  }
+
+  /**
+   * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
+   * for it to be thrown.
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate an exception
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param exceptionClass    the expected class of the exception (or a super class of it)
+   * @throws T the exception specified by the injection, if it is time
+   */
+  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+    final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+    final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+    if (exceptionInjection != null) {
+      exceptionInjection.throwChecked(exceptionClass);
+    }
+    return this;
+  }
+
+  /**
+   * Pauses at this point, if such an injection is specified (i.e. matches the site description).
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate a pause
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param logger            logger of the class containing the injection site
+   */
+  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+                                               final Logger logger) {
+    final PauseInjection pauseInjection =
+      executionControls.lookupPauseInjection(this, desc);
+
+    if (pauseInjection != null) {
+      logger.debug("Pausing at {}", desc);
+      pauseInjection.pause();
+      logger.debug("Resuming at {}", desc);
+    }
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
new file mode 100644
index 0000000..96fed3a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The base class for all types of injections (currently, pause and exception).
+ */
+public abstract class Injection {
+
+  protected final String address;  // the address of the drillbit on which to inject
+  protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
+  private final Class<?> siteClass; // the class where the injection should happen
+  private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+  private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
+  private final AtomicInteger nFire;  // the number of times to do the injection, after any skips; starts > 0
+
+  protected Injection(final String address, final int port, final String siteClass, final String desc,
+                      final int nSkip, final int nFire) throws InjectionConfigurationException {
+    if (desc == null || desc.isEmpty()) {
+      throw new InjectionConfigurationException("Injection desc is null or empty.");
+    }
+
+    if (nSkip < 0) {
+      throw new InjectionConfigurationException("Injection nSkip is not non-negative.");
+    }
+
+    if (nFire <= 0) {
+      throw new InjectionConfigurationException("Injection nFire is non-positive.");
+    }
+    try {
+      this.siteClass = Class.forName(siteClass);
+    } catch (ClassNotFoundException e) {
+      throw new InjectionConfigurationException("Injection siteClass not found.", e);
+    }
+
+    this.address = address;
+    this.port = port;
+    this.desc = desc;
+    this.nSkip = new AtomicInteger(nSkip);
+    this.nFire = new AtomicInteger(nFire);
+  }
+
+  /**
+   * This function checks if it is the right time for the injection to happen.
+   *
+   * @return if the injection should be injected now
+   */
+  protected final boolean injectNow() {
+    return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
+  }
+
+  public String getDesc() {
+    return desc;
+  }
+
+  public Class<?> getSiteClass() {
+    return siteClass;
+  }
+
+  // If the address is null, the injection must happen on every drillbit that reaches the specified site.
+  public final boolean isValidForBit(final DrillbitEndpoint endpoint) {
+    return address == null ||
+      (address.equals(endpoint.getAddress()) && port == endpoint.getUserPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
new file mode 100644
index 0000000..4fcb33a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * An Exception thrown when injection configuration is incorrect.
+ */
+public class InjectionConfigurationException extends Exception {
+  public InjectionConfigurationException(String message, Throwable throwable) {
+    super(message, throwable);
+  }
+
+  public InjectionConfigurationException(String message) {
+    super(message);
+  }
+
+  public InjectionConfigurationException(Throwable throwable) {
+    super(throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
index 9e19fdd..2bb9acc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
@@ -17,8 +17,13 @@
  */
 package org.apache.drill.exec.testing;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
 import com.google.common.base.Preconditions;
 
+import java.io.IOException;
+
 public class InjectionSite {
   private final Class<?> clazz;
   private final String desc;
@@ -31,14 +36,6 @@ public class InjectionSite {
     this.desc = desc;
   }
 
-  public Class<?> getSiteClass() {
-    return clazz;
-  }
-
-  public String getDesc() {
-    return desc;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (o == null) {
@@ -65,8 +62,35 @@ public class InjectionSite {
     return true;
   }
 
+  private static final String SEPARATOR = ",";
+
+  @Override
+  public String toString() {
+    return clazz.getName() + SEPARATOR + desc;
+  }
+
   @Override
   public int hashCode() {
     return (clazz.hashCode() + 13) ^ (1 - desc.hashCode());
   }
+
+  /**
+   * Key Deserializer for InjectionSite.
+   * Since JSON object keys must be strings, deserialize from a string.
+   */
+  public static class InjectionSiteKeyDeserializer extends KeyDeserializer {
+
+    @Override
+    public Object deserializeKey(final String key, final DeserializationContext context)
+      throws IOException, JsonProcessingException {
+      final String[] fields = key.split(SEPARATOR);
+      final Class<?> siteClass;
+      try {
+        siteClass = Class.forName(fields[0]);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Class " + fields[0] + " not found.", e);
+      }
+      return new InjectionSite(siteClass, fields[1]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
new file mode 100644
index 0000000..80d9790
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.slf4j.Logger;
+
+/**
+ * An injector that does not inject any controls.
+ */
+public final class NoOpControlsInjector extends ExecutionControlsInjector {
+
+  protected NoOpControlsInjector(final Class<?> clazz) {
+    super(clazz);
+  }
+
+  @Override
+  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+    return this;
+  }
+
+  @Override
+  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+    final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+    return this;
+  }
+
+  @Override
+  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+                                               final Logger logger) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
new file mode 100644
index 0000000..e5f9c9c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+/**
+ * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
+ * injected pauses; these pauses are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class PauseInjection extends Injection {
+
+  private final long millis;
+
+  @JsonCreator // ensures instances are created only through JSON
+  private PauseInjection(@JsonProperty("address") final String address,
+                         @JsonProperty("port") final int port,
+                         @JsonProperty("siteClass") final String siteClass,
+                         @JsonProperty("desc") final String desc,
+                         @JsonProperty("nSkip") final int nSkip,
+                         @JsonProperty("nFire") final int nFire,
+                         @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, nFire);
+    if (millis <= 0) {
+      throw new InjectionConfigurationException("Pause millis is non-positive.");
+    }
+    this.millis = millis;
+  }
+
+  public void pause() {
+    if (! injectNow()) {
+      return;
+    }
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      throw new DrillRuntimeException("Well, I should be sleeping.");
+    }
+  }
+}