You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/21 05:00:22 UTC

[2/2] drill git commit: DRILL-2383: Support to inject exceptions and pauses in various components of Drill + Controls are fired only if assertions are enabled + Controls can be introduced in any class that has access to FragmentContext/QueryContext + Con

DRILL-2383: Support to inject exceptions and pauses in various components of Drill
+ Controls are fired only if assertions are enabled
+ Controls can be introduced in any class that has access to FragmentContext/QueryContext
+ Controls can be fired by altering the DRILLBIT_CONTROL_INJECTIONS session option
+ Renames: SimulatedExceptions => ExecutionControls, ExceptionInjector => ExecutionControlsInjector
+ Added injection sites in Foreman, DrillSqlWorker, FragmentExecutor
+ Unit tests in TestDrillbitResilience, TestExceptionInjection and TestPauseInjection

Other commits included:
+ DRILL-2437: Moved ExecutionControls from DrillbitContext to FragmentContext/QueryContext
+ DRILL-2382: Added address and port to Injection to specify drillbit
+ DRILL-2384: Added QueryState to SingleRowListener and assert that state is COMPLETED while testing

Other edits:
+ Support for short lived session options in SessionOptionManager (using TTL in OptionValidator)
+ Introduced query count in UserSession
+ Added QueryState to queryCompleted() in UserResultsListener to check if COMPLETED/CANCELED
+ Added JSONStringValidator to TypeValidators
+ Log query id as string in DrillClient, WorkEventBus, QueryResultHandler
+ Use try..catch block only around else clause for OptionList in FragmentContext
+ Fixed drillbitContext spelling error in QueryContext
+ Fixed state transition when cancel() before run() in FragmentExecutor
+ Do not call setLocalOption twice in FallbackOptionManager
+ Show explicitly that submitWork() returns queryId in UserServer
+ Updated protocol/readme.txt to include an alternative way to generate sources


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/be8d9539
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/be8d9539
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/be8d9539

Branch: refs/heads/master
Commit: be8d953935461ee6567b0c4d96c503e8b04469d2
Parents: 21dfe7a
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Fri Apr 17 09:46:11 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Mon Apr 20 19:58:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   7 +-
 .../apache/drill/exec/client/DrillClient.java   |   7 +-
 .../exec/client/PrintingResultsListener.java    |   3 +-
 .../apache/drill/exec/ops/FragmentContext.java  |  26 +-
 .../org/apache/drill/exec/ops/QueryContext.java |  14 +-
 .../drill/exec/physical/impl/ScreenCreator.java |   4 +
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   6 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |  20 +-
 .../drill/exec/rpc/user/QueryResultHandler.java |  35 +-
 .../exec/rpc/user/UserResultsListener.java      |   4 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |  10 +-
 .../apache/drill/exec/rpc/user/UserSession.java |  26 +-
 .../drill/exec/server/DrillbitContext.java      |   5 -
 .../server/options/FallbackOptionManager.java   |   2 -
 .../exec/server/options/OptionManager.java      |   1 +
 .../exec/server/options/OptionValidator.java    |  26 ++
 .../server/options/SessionOptionManager.java    |  75 ++-
 .../server/options/SystemOptionManager.java     |   7 +-
 .../exec/server/options/TypeValidators.java     |  31 ++
 .../drill/exec/server/rest/QueryWrapper.java    |   3 +-
 .../drill/exec/testing/ExceptionInjection.java  |  84 ++--
 .../drill/exec/testing/ExceptionInjector.java   | 112 -----
 .../drill/exec/testing/ExecutionControls.java   | 193 ++++++++
 .../exec/testing/ExecutionControlsInjector.java | 129 ++++++
 .../apache/drill/exec/testing/Injection.java    |  84 ++++
 .../InjectionConfigurationException.java        |  35 ++
 .../drill/exec/testing/InjectionSite.java       |  40 +-
 .../exec/testing/NoOpControlsInjector.java      |  48 ++
 .../drill/exec/testing/PauseInjection.java      |  63 +++
 .../drill/exec/testing/SimulatedExceptions.java | 164 -------
 .../apache/drill/exec/work/foreman/Foreman.java |  14 +-
 .../exec/work/fragment/FragmentExecutor.java    |  55 ++-
 .../apache/drill/exec/work/user/UserWorker.java |   9 +
 .../java/org/apache/drill/BaseTestQuery.java    |   3 +-
 .../java/org/apache/drill/PlanningBase.java     |   8 +-
 .../org/apache/drill/SingleRowListener.java     |   3 +-
 .../exec/compile/TestClassTransformation.java   |   6 +-
 .../exec/server/TestDrillbitResilience.java     | 461 ++++++++++++++-----
 .../store/parquet/ParquetResultListener.java    |   3 +-
 .../store/parquet/TestParquetPhysicalPlan.java  |   3 +-
 .../exec/testing/ControlsInjectionUtil.java     |  95 ++++
 .../exec/testing/ExceptionInjectionUtil.java    |  82 ----
 .../exec/testing/TestExceptionInjection.java    | 257 ++++++++---
 .../drill/exec/testing/TestPauseInjection.java  |  96 ++++
 .../drill/jdbc/impl/DrillResultSetImpl.java     |   3 +-
 protocol/readme.txt                             |   2 +-
 46 files changed, 1682 insertions(+), 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7d89ac9..f7648b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.PowerOfTwoLongValidator;
 import org.apache.drill.exec.server.options.TypeValidators.StringValidator;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 public interface ExecConstants {
   public static final String ZK_RETRY_TIMES = "drill.exec.zk.retry.count";
@@ -216,7 +217,7 @@ public interface ExecConstants {
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
   public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
 
-  public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
-  public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
-      new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
+  public static final String DRILLBIT_CONTROL_INJECTIONS = "drill.exec.testing.controls";
+  public static final OptionValidator DRILLBIT_CONTROLS_VALIDATOR =
+    new ExecutionControls.ControlsOptionValidator(DRILLBIT_CONTROL_INJECTIONS, ExecutionControls.DEFAULT_CONTROLS, 1);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 336a149..0d29f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
@@ -326,7 +327,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       future.set(results);
     }
 
@@ -352,7 +353,9 @@ public class DrillClient implements Closeable, ConnectionThrottle {
 
     @Override
     public void queryIdArrived(QueryId queryId) {
-      logger.debug( "Query ID arrived: {}", queryId );
+      if (logger.isDebugEnabled()) {
+        logger.debug("Query ID arrived: {}", QueryIdHelper.getQueryId(queryId));
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 2bf35b1..64e7266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -60,7 +61,7 @@ public class PrintingResultsListener implements UserResultsListener {
   }
 
   @Override
-  public void queryCompleted() {
+  public void queryCompleted(QueryState state) {
     allocator.close();
     latch.countDown();
     System.out.println("Total rows returned: " + count.get());

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index c46613d..9400355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +64,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
   private final DrillbitContext context;
-  private final UserClientConnection connection;
+  private final UserClientConnection connection; // is null if attached to non-root fragment
   private final FragmentStats stats;
   private final FunctionImplementationRegistry funcRegistry;
   private final BufferAllocator allocator;
@@ -73,6 +74,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
   private final OptionManager fragmentOptions;
   private final BufferManager bufferManager;
   private ExecutorState executorState;
+  private final ExecutionControls executionControls;
 
   private final SendingAccountor sendingAccountor = new SendingAccountor();
   private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@@ -98,17 +100,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
     logger.debug("Fragment max allocation: {}", fragment.getMemMax());
 
-    try {
-      final OptionList list;
-      if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
-        list = new OptionList();
-      } else {
+    final OptionList list;
+    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
+      list = new OptionList();
+    } else {
+      try {
         list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+      } catch (final Exception e) {
+        throw new ExecutionSetupException("Failure while reading plan options.", e);
       }
-      fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
-    } catch (final Exception e) {
-      throw new ExecutionSetupException("Failure while reading plan options.", e);
     }
+    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+
+    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
 
     // Add the fragment context to the root allocator.
     // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
@@ -288,6 +292,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     allocator.setFragmentLimit(limit);
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   @Override
   public void close() {
     waitForSendComplete();

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 2fa0b18..2dcac25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.server.options.QueryOptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
 import org.apache.drill.exec.store.PartitionExplorerImpl;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControls;
 
 // TODO except for a couple of tests, this is only created by Foreman
 // TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
@@ -52,6 +53,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
   private final OptionManager queryOptions;
   private final PlannerSettings plannerSettings;
   private final DrillOperatorTable table;
+  private final ExecutionControls executionControls;
 
   private final BufferAllocator allocator;
   private final BufferManager bufferManager;
@@ -65,10 +67,11 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
    */
   private boolean closed = false;
 
-  public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
-    this.drillbitContext = drllbitContext;
+  public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
+    this.drillbitContext = drillbitContext;
     this.session = session;
     queryOptions = new QueryOptionManager(session.getOptions());
+    executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
     plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
     plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
     table = new DrillOperatorTable(getFunctionRegistry());
@@ -78,7 +81,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
 
     try {
-      allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
+      allocator = drillbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
           MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
     } catch (OutOfMemoryException e) {
       throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
@@ -87,7 +90,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     bufferManager = new BufferManager(this.allocator, null);
   }
 
-
   public PlannerSettings getPlannerSettings() {
     return plannerSettings;
   }
@@ -120,6 +122,10 @@ public class QueryContext implements AutoCloseable, UdfUtilities {
     return queryOptions;
   }
 
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
   public DrillbitEndpoint getCurrentEndpoint() {
     return drillbitContext.getEndpoint();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 6b3caf4..2069d35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -34,9 +34,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 
 public class ScreenCreator implements RootCreator<Screen>{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ScreenCreator.class);
 
 
 
@@ -107,6 +109,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         materializer = new VectorRecordMaterializer(context, incoming);
         //$FALL-THROUGH$
       case OK:
+        injector.injectPause(context.getExecutionControls(), "sending-data", logger);
         QueryWritableBatch batch = materializer.convertNext();
         updateStats(batch);
         stats.startWait();
@@ -139,6 +142,7 @@ public class ScreenCreator implements RootCreator<Screen>{
       if (!oContext.isClosed()) {
         internalStop();
       }
+      injector.injectPause(context.getExecutionControls(), "send-complete", logger);
     }
 
     RecordBatch getIncoming() {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 9ca64d8..097b7bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -45,7 +45,9 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
 import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
 import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.foreman.ForemanException;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.rules.ReduceExpressionsRule;
@@ -60,7 +62,8 @@ import org.eigenbase.sql.parser.SqlParseException;
 import org.eigenbase.sql.parser.SqlParser;
 
 public class DrillSqlWorker {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(DrillSqlWorker.class);
 
   private final Planner planner;
   private final HepPlanner hepPlanner;
@@ -119,6 +122,7 @@ public class DrillSqlWorker {
   public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws ForemanSetupException {
     SqlNode sqlNode;
     try {
+      injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
       sqlNode = planner.parse(sql);
     } catch (SqlParseException e) {
       throw new QueryInputException("Failure parsing SQL. " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index a5a5441..d90096a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -45,13 +45,17 @@ public class WorkEventBus {
           .build();
 
   public void removeFragmentStatusListener(final QueryId queryId) {
-    logger.debug("Removing fragment status listener for queryId {}.", queryId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+    }
     listeners.remove(queryId);
   }
 
   public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener)
       throws ForemanSetupException {
-    logger.debug("Adding fragment status listener for queryId {}.", queryId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Adding fragment status listener for queryId {}.", QueryIdHelper.getQueryId(queryId));
+    }
     final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
     if (old != null) {
       throw new ForemanSetupException (
@@ -69,7 +73,9 @@ public class WorkEventBus {
   }
 
   public void addFragmentManager(final FragmentManager fragmentManager) {
-    logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+    if (logger.isDebugEnabled()) {
+      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
+    }
     final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
       if (old != null) {
         throw new IllegalStateException(
@@ -84,7 +90,9 @@ public class WorkEventBus {
   public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
     // check if this was a recently canceled fragment.  If so, throw away message.
     if (recentlyFinishedFragments.asMap().containsKey(handle)) {
-      logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+      }
       return null;
     }
 
@@ -98,7 +106,9 @@ public class WorkEventBus {
   }
 
   public void removeFragmentManager(final FragmentHandle handle) {
-    logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    }
     recentlyFinishedFragments.put(handle,  1);
     managers.remove(handle);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 3c807d5..3beae89 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcBus;
 import org.apache.drill.exec.rpc.RpcException;
@@ -114,7 +115,7 @@ public class QueryResultHandler {
         // A successful completion/canceled case--pass on via resultArrived
 
         try {
-          resultsListener.queryCompleted();
+          resultsListener.queryCompleted(queryState);
         } catch ( Exception e ) {
           resultsListener.submissionFailed(UserException.systemError(e).build());
         }
@@ -198,8 +199,8 @@ public class QueryResultHandler {
   private static class BufferingResultsListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
-    private volatile boolean finished = false;
     private volatile UserException ex;
+    private volatile QueryState queryState;
     private volatile UserResultsListener output;
     private volatile ConnectionThrottle throttle;
 
@@ -212,20 +213,22 @@ public class QueryResultHandler {
         if (ex != null) {
           l.submissionFailed(ex);
           return true;
-        } else if (finished) {
-          l.queryCompleted();
+        } else if (queryState != null) {
+          l.queryCompleted(queryState);
+          return true;
         }
 
-        return finished;
+        return false;
       }
     }
 
     @Override
-    public void queryCompleted() {
-      finished = true;
+    public void queryCompleted(QueryState state) {
+      assert queryState == null;
+      this.queryState = state;
       synchronized (this) {
         if (output != null) {
-          output.queryCompleted();
+          output.queryCompleted(state);
         }
       }
     }
@@ -245,7 +248,11 @@ public class QueryResultHandler {
 
     @Override
     public void submissionFailed(UserException ex) {
-      finished = true;
+      assert queryState == null;
+      // there is one case when submissionFailed() is called even though the query didn't fail on the server side
+      // it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
+      // submissionFailed() by QueryResultHandler.dataArrived()
+      queryState = QueryState.FAILED;
       synchronized (this) {
         if (output == null) {
           this.ex = ex;
@@ -255,10 +262,6 @@ public class QueryResultHandler {
       }
     }
 
-    public boolean isFinished() {
-      return finished;
-    }
-
     @Override
     public void queryIdArrived(QueryId queryId) {
     }
@@ -281,8 +284,10 @@ public class QueryResultHandler {
     @Override
     public void success(QueryId queryId, ByteBuf buf) {
       resultsListener.queryIdArrived(queryId);
-      logger.debug("Received QueryId {} successfully.  Adding results listener {}.",
-                   queryId, resultsListener);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Received QueryId {} successfully. Adding results listener {}.",
+          QueryIdHelper.getQueryId(queryId), resultsListener);
+      }
       UserResultsListener oldListener =
           queryIdToResultsListenersMap.putIfAbsent(queryId, resultsListener);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index f928476..e422a3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.user;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 
 public interface UserResultsListener {
 
@@ -38,8 +39,9 @@ public interface UserResultsListener {
   /**
    * The query has completed (successsful completion or cancellation). The listener will not receive any other
    * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
+   * @param state
    */
-  void queryCompleted();
+  void queryCompleted(QueryState state);
 
   /**
    * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 877bc08..9e929de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -97,16 +97,18 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
     case RpcType.RUN_QUERY_VALUE:
       logger.debug("Received query to run.  Returning query handle.");
       try {
-        RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
-        return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
+        final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final QueryId queryId = worker.submitWork(connection, query);
+        return new Response(RpcType.QUERY_HANDLE, queryId);
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding RunQuery body.", e);
       }
 
     case RpcType.CANCEL_QUERY_VALUE:
       try {
-        QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
-        return new Response(RpcType.ACK, worker.cancelQuery(queryId));
+        final QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+        final Ack ack = worker.cancelQuery(queryId);
+        return new Response(RpcType.ACK, ack);
       } catch (InvalidProtocolBufferException e) {
         throw new RpcException("Failure while decoding QueryId body.", e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 19d77b0..e631792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.rpc.user;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import net.hydromatic.optiq.SchemaPlus;
 
@@ -42,6 +43,16 @@ public class UserSession {
   private UserCredentials credentials;
   private Map<String, String> properties;
   private OptionManager sessionOptions;
+  private final AtomicInteger queryCount;
+
+  /**
+   * Implementations of this interface are allowed to increment queryCount.
+   * {@link org.apache.drill.exec.work.user.UserWorker} should have a member that implements the interface.
+   * No other core class should implement this interface. Test classes may implement (see ControlsInjectionUtil).
+   */
+  public static interface QueryCountIncrementer {
+    public void increment(final UserSession session);
+  }
 
   public static class Builder {
     UserSession userSession;
@@ -56,7 +67,7 @@ public class UserSession {
     }
 
     public Builder withOptionManager(OptionManager systemOptions) {
-      userSession.sessionOptions = new SessionOptionManager(systemOptions);
+      userSession.sessionOptions = new SessionOptionManager(systemOptions, userSession);
       return this;
     }
 
@@ -87,7 +98,9 @@ public class UserSession {
     }
   }
 
-  private UserSession() { }
+  private UserSession() {
+    queryCount = new AtomicInteger(0);
+  }
 
   public boolean isSupportComplexTypes() {
     return supportComplexTypes;
@@ -105,6 +118,15 @@ public class UserSession {
     return credentials;
   }
 
+  public void incrementQueryCount(final QueryCountIncrementer incrementer) {
+    assert incrementer != null;
+    queryCount.incrementAndGet();
+  }
+
+  public int getQueryCount() {
+    return queryCount.get();
+  }
+
   /**
    * Update the schema path for the session.
    * @param fullPath The desired path to set to.

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index dbf3c74..6fdbfca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.testing.SimulatedExceptions;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Preconditions;
@@ -59,7 +58,6 @@ public class DrillbitContext {
   private final PStoreProvider provider;
   private final CodeCompiler compiler;
   private final ExecutorService executor;
-  private final SimulatedExceptions simulatedExceptions = new SimulatedExceptions();
 
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord,
       Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider,
@@ -164,7 +162,4 @@ public class DrillbitContext {
     return executor;
   }
 
-  public SimulatedExceptions getSimulatedExceptions() {
-    return simulatedExceptions;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
index 4e90616..682bfea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -67,8 +67,6 @@ public abstract class FallbackOptionManager extends BaseOptionManager {
   private void setValidatedOption(OptionValue value) {
     if (!setLocalOption(value)) {
       fallback.setOption(value);
-    }else{
-      setLocalOption(value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
index 0b8811a..0fed1fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionManager.java
@@ -39,6 +39,7 @@ public interface OptionManager extends Iterable<OptionValue> {
 
   public interface OptionAdmin {
     public void registerOptionType(OptionValidator validator);
+    public OptionValidator getValidator(String name);
     public void validate(OptionValue v) throws SetOptionException;
     public OptionValue validate(String name, SqlLiteral value, OptionType optionType) throws SetOptionException;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
index 43071e7..90ce3a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java
@@ -53,6 +53,32 @@ public abstract class OptionValidator {
     return optionName;
   }
 
+  /**
+   * This function returns true if and only if the validator is meant for a short-lived option.
+   *
+   * NOTE: By default, options are not short-lived. So, if a derived class is meant for a short-lived option,
+   * that class must do two things:
+   * (1) override this method to return true, and
+   * (2) return the number of queries for which the option is valid through {@link #getTtl}.
+   * E.g. {@link org.apache.drill.exec.testing.ExecutionControls.ControlsOptionValidator}
+   * @return if this validator is for a short-lived option
+   */
+  public boolean isShortLived() {
+    return false;
+  }
+
+  /**
+   * If an option is short-lived, this returns the number of queries for which the option is valid.
+   * Please read the note at {@link #isShortLived}
+   * @return number of queries for which the option should be valid
+   */
+  public int getTtl() {
+    if (!isShortLived()) {
+      throw new UnsupportedOperationException("This option is not short-lived.");
+    }
+    return 0;
+  }
+
   public String getDefaultString() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index c3de190..340358f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,13 +17,86 @@
  */
 package org.apache.drill.exec.server.options;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class SessionOptionManager extends InMemoryOptionManager {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
 
-  public SessionOptionManager(OptionManager systemOptions) {
+  private final UserSession session;
+
+  /**
+   * Map of short lived options. Key: option name, Value: [ start, end )
+   */
+  private final ConcurrentHashMap<String, ImmutablePair<Integer, Integer>> shortLivedOptions = new ConcurrentHashMap<>();
+
+  public SessionOptionManager(final OptionManager systemOptions, final UserSession session) {
     super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
+    this.session = session;
+  }
+
+  @Override
+  boolean setLocalOption(final OptionValue value) {
+    final boolean set = super.setLocalOption(value);
+    final String name = value.name;
+    final OptionValidator validator = fallback.getAdmin().getValidator(name);
+    final boolean shortLived = validator.isShortLived();
+    if (set && shortLived) {
+      final int start = session.getQueryCount() + 1; // start from the next query
+      final int ttl = validator.getTtl();
+      final int end = start + ttl;
+      shortLivedOptions.put(name, new ImmutablePair<>(start, end));
+    }
+    return set;
+  }
+
+  @Override
+  OptionValue getLocalOption(final String name) {
+    final OptionValue value = options.get(name);
+    if (shortLivedOptions.containsKey(name)) {
+      if (withinRange(value)) {
+        return value;
+      }
+      final int queryNumber = session.getQueryCount();
+      final int start = shortLivedOptions.get(name).getLeft();
+      // option is not in effect if queryNumber < start
+      if (queryNumber < start) {
+        return fallback.getAdmin().getValidator(name).getDefault();
+      // reset if queryNumber <= end
+      } else {
+        options.remove(name);
+        shortLivedOptions.remove(name);
+        return null; // fallback takes effect
+      }
+    }
+    return value;
+  }
+
+  private boolean withinRange(final OptionValue value) {
+    final int queryNumber = session.getQueryCount();
+    final ImmutablePair<Integer, Integer> pair = shortLivedOptions.get(value.name);
+    final int start = pair.getLeft();
+    final int end = pair.getRight();
+    return start <= queryNumber && queryNumber < end;
+  }
+
+  private final Predicate<OptionValue> isLive = new Predicate<OptionValue>() {
+    @Override
+    public boolean apply(final OptionValue value) {
+      final String name = value.name;
+      return !shortLivedOptions.containsKey(name) || withinRange(value);
+    }
+  };
+
+  @Override
+  Iterable<OptionValue> optionIterable() {
+    final Collection<OptionValue> liveOptions = Collections2.filter(options.values(), isLive);
+    return liveOptions;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4471d4f..a745479 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -101,7 +101,7 @@ public class SystemOptionManager extends BaseOptionManager {
       QueryClassLoader.JAVA_COMPILER_DEBUG,
       ExecConstants.ENABLE_VERBOSE_ERRORS,
       ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
-      ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR,
+      ExecConstants.DRILLBIT_CONTROLS_VALIDATOR,
       ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
   };
 
@@ -216,6 +216,11 @@ public class SystemOptionManager extends BaseOptionManager {
     }
 
     @Override
+    public OptionValidator getValidator(final String name) {
+      return knownOptions.get(name);
+    }
+
+    @Override
     public void validate(final OptionValue v) throws SetOptionException {
       final OptionValidator validator = knownOptions.get(v.name);
       if (validator == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index b9721cc..e7b1eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -17,10 +17,12 @@
  */
 package org.apache.drill.exec.server.options;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.HashSet;
 import java.util.Set;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.common.exceptions.ExpressionParsingException;
 import org.apache.drill.exec.server.options.OptionValue.Kind;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
@@ -155,6 +157,35 @@ public class TypeValidators {
     }
   }
 
+  /**
+   * Validator for POJO passed in as JSON string
+   */
+  public static class JsonStringValidator extends StringValidator {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+    private final Class<?> clazz;
+
+    public JsonStringValidator(final String name, final Class<?> clazz, final String def) {
+      super(name, def);
+      this.clazz = clazz;
+      validateJson(def, clazz);
+    }
+
+    @Override
+    public void validate(final OptionValue v) throws ExpressionParsingException {
+      super.validate(v);
+      validateJson(v.string_val, clazz);
+    }
+
+    private static void validateJson(final String jsonString, final Class<?> clazz) {
+      try {
+        mapper.readValue(jsonString, clazz);
+      } catch (IOException e) {
+        throw new ExpressionParsingException("Invalid JSON string (" + jsonString + ") for class " + clazz.getName(), e);
+      }
+    }
+  }
+
   public static abstract class TypeValidator extends OptionValidator {
     private final Kind kind;
     private final OptionValue defaultValue;

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 62f5bdb..aa43aa9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -136,7 +137,7 @@ public class QueryWrapper {
     }
 
     @Override
-    public void queryCompleted() {
+    public void queryCompleted(QueryState state) {
       latch.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
index 68cbf08..61f0d67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
@@ -17,38 +17,48 @@
  */
 package org.apache.drill.exec.testing;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Injection for a single exception. Specifies how many times to inject it, and how many times to skip
  * injecting it before the first injection. This class is used internally for tracking injected
  * exceptions; injected exceptions are specified via the
- * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_EXCEPTION_INJECTIONS} system option.
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
  */
-public class ExceptionInjection {
-  private final String desc; // description of the injection site
-
-  private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
-  private final AtomicInteger nThrow; // the number of times to do the injection, after any skips; starts > 0
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class ExceptionInjection extends Injection {
 
   private final Class<? extends Throwable> exceptionClass;
 
-  /**
-   * Constructor.
-   *
-   * @param desc description of the injection site; useful for multiple injections in a single class
-   * @param nSkip non-negative number of times to skip injecting the exception
-   * @param nFire positive number of times to inject the exception
-   * @param exceptionClass
-   */
-  public ExceptionInjection(final String desc, final int nSkip, final int nFire,
-      final Class<? extends Throwable> exceptionClass) {
-    this.desc = desc;
-    this.nSkip = new AtomicInteger(nSkip);
-    this.nThrow = new AtomicInteger(nFire);
-    this.exceptionClass = exceptionClass;
+  @JsonCreator // ensures instances are created only through JSON
+  private ExceptionInjection(@JsonProperty("address") final String address,
+                             @JsonProperty("port") final int port,
+                             @JsonProperty("siteClass") final String siteClass,
+                             @JsonProperty("desc") final String desc,
+                             @JsonProperty("nSkip") final int nSkip,
+                             @JsonProperty("nFire") final int nFire,
+                             @JsonProperty("exceptionClass") String classString) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, nFire);
+    final Class<?> clazz;
+    try {
+      clazz = Class.forName(classString);
+    } catch (ClassNotFoundException e) {
+      throw new InjectionConfigurationException("Injected exceptionClass not found.", e);
+    }
+
+    if (!Throwable.class.isAssignableFrom(clazz)) {
+      throw new InjectionConfigurationException("Injected exceptionClass is not a Throwable.");
+    }
+
+    @SuppressWarnings("unchecked")
+    final Class<? extends Throwable> exceptionClazz = (Class<? extends Throwable>) clazz;
+    this.exceptionClass = exceptionClazz;
   }
 
   /**
@@ -57,29 +67,25 @@ public class ExceptionInjection {
    * @return the exception to throw, or null if it isn't time to throw it
    */
   private Throwable constructException() {
-    final int remainingSkips = nSkip.decrementAndGet();
-    if (remainingSkips >= 0) {
-      return null;
-    }
-
-    final int remainingFirings = nThrow.decrementAndGet();
-    if (remainingFirings < 0) {
+    if (! injectNow()) {
       return null;
     }
 
     // if we get here, we should throw the specified exception
-    Constructor<?> constructor;
+    final Constructor<?> constructor;
     try {
       constructor = exceptionClass.getConstructor(String.class);
-    } catch(NoSuchMethodException e) {
-      throw new RuntimeException("No constructor found that takes a single String argument");
+    } catch (NoSuchMethodException e) {
+      // this should not throw; validated already.
+      throw new RuntimeException("No constructor found that takes a single String argument.");
     }
 
-    Throwable throwable;
+    final Throwable throwable;
     try {
-      throwable = (Throwable) constructor.newInstance(desc);
-    } catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      throw new IllegalStateException("Couldn't construct exception instance", e);
+      throwable = (Throwable) constructor.newInstance(getDesc());
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      // this should not throw; validated already.
+      throw new IllegalStateException("Couldn't construct exception instance.", e);
     }
 
     return throwable;
@@ -105,16 +111,16 @@ public class ExceptionInjection {
       throw e;
     }
 
-    throw new IllegalStateException("throwable was not an unchecked exception");
+    throw new IllegalStateException("Throwable was not an unchecked exception.");
   }
 
   /**
    * Throw the checked exception specified by this injection.
    *
    * @param exceptionClass the class of the exception to throw
-   * @throws T if it is time to throw the exception
+   * @throws T                     if it is time to throw the exception
    * @throws IllegalStateException if it is time to throw the exception, and the exception's class
-   *   is incompatible with the class specified by the injection
+   *                               is incompatible with the class specified by the injection
    */
   public <T extends Throwable> void throwChecked(final Class<T> exceptionClass) throws T {
     final Throwable throwable = constructException();
@@ -128,6 +134,6 @@ public class ExceptionInjection {
     }
 
     throw new IllegalStateException("Constructed Throwable(" + throwable.getClass().getName()
-        + ") is incompatible with exceptionClass("+ exceptionClass.getName() + ")");
+      + ") is incompatible with exceptionClass(" + exceptionClass.getName() + ")");
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
deleted file mode 100644
index 54bc351..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.testing;
-
-import org.apache.drill.exec.server.DrillbitContext;
-
-/**
- * Injects exceptions at execution time for testing. Any class that wants to simulate exceptions
- * for testing should have it's own private static instance of an injector (similar to the use
- * of loggers).
- *
- * <p>See {@link org.apache.drill.exec.testing.TestExceptionInjection} for examples of
- * use.
- */
-public class ExceptionInjector {
-  private final Class<?> clazz; // the class that owns this injector
-
-  /**
-   * Constructor. Classes should use the static {@link #getInjector()} method to obtain
-   * their injector.
-   *
-   * @param clazz the owning class
-   */
-  private ExceptionInjector(final Class<?> clazz) {
-    this.clazz = clazz;
-  }
-
-  /**
-   * Create an injector.
-   *
-   * @param clazz the owning class
-   * @return the newly created injector
-   */
-  public static ExceptionInjector getInjector(final Class<?> clazz) {
-    return new ExceptionInjector(clazz);
-  }
-
-  /**
-   * Get the injector's owning class.
-   *
-   * @return the injector's owning class
-   */
-  public Class<?> getSiteClass() {
-    return clazz;
-  }
-
-  /**
-   * Lookup an injection within this class that matches the site description.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * @return the injection, if there is one; null otherwise
-   */
-  private ExceptionInjection getInjection(final DrillbitContext drillbitContext, final String desc) {
-    final SimulatedExceptions simulatedExceptions = drillbitContext.getSimulatedExceptions();
-    final ExceptionInjection exceptionInjection = simulatedExceptions.lookupInjection(drillbitContext, this, desc);
-    return exceptionInjection;
-  }
-
-  /**
-   * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
-   * for it to be thrown.
-   *
-   * <p>Implementors use this in their code at a site where they want to simulate an exception
-   * during testing.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * throws the exception specified by the injection, if it is time
-   */
-  public void injectUnchecked(final DrillbitContext drillbitContext, final String desc) {
-    final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
-    if (exceptionInjection != null) {
-      exceptionInjection.throwUnchecked();
-    }
-  }
-
-  /**
-   * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
-   * for it to be thrown.
-   *
-   * <p>Implementors use this in their code at a site where they want to simulate an exception
-   * during testing.
-   *
-   * @param drillbitContext
-   * @param desc the site description
-   * @param exceptionClass the expected class of the exception (or a super class of it)
-   * @throws T the exception specified by the injection, if it is time
-   */
-  public <T extends Throwable> void injectChecked(
-      final DrillbitContext drillbitContext, final String desc, final Class<T> exceptionClass) throws T {
-    final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
-    if (exceptionInjection != null) {
-      exceptionInjection.throwChecked(exceptionClass);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
new file mode 100644
index 0000000..1171bf8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExpressionParsingException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.server.options.TypeValidators.TypeValidator;
+import org.apache.drill.exec.testing.InjectionSite.InjectionSiteKeyDeserializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tracks the simulated controls that will be injected for testing purposes.
+ */
+public final class ExecutionControls {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControls.class);
+
+  // used to map JSON specified injections to POJOs
+  public static final ObjectMapper controlsOptionMapper = new ObjectMapper();
+
+  static {
+    controlsOptionMapper.addMixInAnnotations(Injection.class, InjectionMixIn.class);
+  }
+
+  // Jackson MixIn for all types of injections
+  @JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "type")
+  @JsonSubTypes({
+    @Type(value = ExceptionInjection.class, name = "exception"),
+    @Type(value = PauseInjection.class, name = "pause")})
+  public static abstract class InjectionMixIn {
+  }
+
+  /**
+   * The JSON specified for the {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS}
+   * option is validated using this class. Controls are short-lived options.
+   */
+  public static class ControlsOptionValidator extends TypeValidator {
+
+    private final int ttl; // the number of queries for which this option is valid
+
+    /**
+     * Constructor for controls option validator.
+     * @param name the name of the validator
+     * @param def  the default JSON, specified as string
+     * @param ttl  the number of queries for which this option should be valid
+     */
+    public ControlsOptionValidator(final String name, final String def, final int ttl) {
+      super(name, OptionValue.Kind.DOUBLE, OptionValue.createString(OptionType.SESSION, name, def));
+      assert ttl > 0;
+      this.ttl = ttl;
+    }
+
+    @Override
+    public int getTtl() {
+      return  ttl;
+    }
+
+    @Override
+    public boolean isShortLived() {
+      return true;
+    }
+
+    @Override
+    public void validate(final OptionValue v) throws ExpressionParsingException {
+      if (v.type != OptionType.SESSION) {
+        throw new ExpressionParsingException("Controls can be set only at SESSION level.");
+      }
+      final String jsonString = v.string_val;
+      try {
+        controlsOptionMapper.readValue(jsonString, Controls.class);
+      } catch (IOException e) {
+        throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
+      }
+    }
+  }
+
+  /**
+   * POJO used to parse JSON-specified controls.
+   */
+  public static class Controls {
+    public Collection<? extends Injection> injections;
+  }
+
+  /**
+   * The default value for controls.
+   */
+  public static final String DEFAULT_CONTROLS = "{}";
+
+  /**
+   * Caches the currently specified controls.
+   */
+  @JsonDeserialize(keyUsing = InjectionSiteKeyDeserializer.class)
+  private final Map<InjectionSite, Injection> controls = new HashMap<>();
+
+  private final DrillbitEndpoint endpoint; // the current endpoint
+
+  public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+
+    final OptionValue optionValue = options.getOption(ExecConstants.DRILLBIT_CONTROL_INJECTIONS);
+    if (optionValue == null) {
+      return;
+    }
+
+    final String opString = optionValue.string_val;
+    final Controls controls;
+    try {
+      controls = controlsOptionMapper.readValue(opString, Controls.class);
+    } catch (IOException e) {
+      // This never happens. opString must have been validated.
+      logger.warn("Could not parse injections. Injections must have been validated before this point.");
+      throw new DrillRuntimeException("Could not parse injections.", e);
+    }
+    if (controls.injections == null) {
+      return;
+    }
+
+    logger.debug("Adding control injections: \n{}", opString);
+    for (final Injection injection : controls.injections) {
+      this.controls.put(new InjectionSite(injection.getSiteClass(), injection.getDesc()), injection);
+    }
+  }
+
+  /**
+   * Look for an exception injection matching the given injector, site description and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the exception injection, if there is one for the injector, site and endpoint; null otherwise
+   */
+  public ExceptionInjection lookupExceptionInjection(final ExecutionControlsInjector injector, final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (ExceptionInjection) injection : null;
+  }
+
+  /**
+   * Look for an pause injection matching the given injector, site description and endpoint.
+   *
+   * @param injector the injector, which indicates a class
+   * @param desc     the injection site description
+   * @return the pause injection, if there is one for the injector, site and endpoint; null otherwise
+   */
+  public PauseInjection lookupPauseInjection(final ExecutionControlsInjector injector, final String desc) {
+    final Injection injection = lookupInjection(injector, desc);
+    return injection != null ? (PauseInjection) injection : null;
+  }
+
+  private Injection lookupInjection(final ExecutionControlsInjector injector, final String desc) {
+    if (controls.isEmpty()) {
+      return null;
+    }
+
+    // lookup the request
+    final InjectionSite site = new InjectionSite(injector.getSiteClass(), desc);
+    final Injection injection = controls.get(site);
+    if (injection == null) {
+      return null;
+    }
+    // return only if injection was meant for this drillbit
+    return injection.isValidForBit(endpoint) ? injection : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
new file mode 100644
index 0000000..4b1cd0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.util.AssertionUtil;
+import org.slf4j.Logger;
+
+/**
+ * Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions
+ * or inject pauses for testing should have it's own private static instance of an injector (similar to the use
+ * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or
+ * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and
+ * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use.
+ */
+public class ExecutionControlsInjector {
+//  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionControlsInjector.class);
+
+  private final Class<?> clazz; // the class that owns this injector
+
+  /**
+   * Constructor. Classes should use the static {@link #getInjector} method to obtain their injector.
+   *
+   * @param clazz the owning class
+   */
+  protected ExecutionControlsInjector(final Class<?> clazz) {
+    this.clazz = clazz;
+  }
+
+  /**
+   * Create an injector if assertions are enabled
+   *
+   * @param clazz the owning class
+   * @return the newly created injector
+   */
+  public static ExecutionControlsInjector getInjector(final Class<?> clazz) {
+    if (AssertionUtil.isAssertionsEnabled()) {
+      return new ExecutionControlsInjector(clazz);
+    } else {
+      return new NoOpControlsInjector(clazz);
+    }
+  }
+
+  /**
+   * Get the injector's owning class.
+   *
+   * @return the injector's owning class
+   */
+  public Class<?> getSiteClass() {
+    return clazz;
+  }
+
+  /**
+   * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
+   * for it to be thrown.
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate an exception
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   *                          throws the exception specified by the injection, if it is time
+   */
+  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+    final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+    if (exceptionInjection != null) {
+      exceptionInjection.throwUnchecked();
+    }
+    return this;
+  }
+
+  /**
+   * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
+   * for it to be thrown.
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate an exception
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param exceptionClass    the expected class of the exception (or a super class of it)
+   * @throws T the exception specified by the injection, if it is time
+   */
+  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+    final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+    final ExceptionInjection exceptionInjection = executionControls.lookupExceptionInjection(this, desc);
+    if (exceptionInjection != null) {
+      exceptionInjection.throwChecked(exceptionClass);
+    }
+    return this;
+  }
+
+  /**
+   * Pauses at this point, if such an injection is specified (i.e. matches the site description).
+   * <p/>
+   * <p>Implementors use this in their code at a site where they want to simulate a pause
+   * during testing.
+   *
+   * @param executionControls the controls in the current context
+   * @param desc              the site description
+   * @param logger            logger of the class containing the injection site
+   */
+  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+                                               final Logger logger) {
+    final PauseInjection pauseInjection =
+      executionControls.lookupPauseInjection(this, desc);
+
+    if (pauseInjection != null) {
+      logger.debug("Pausing at {}", desc);
+      pauseInjection.pause();
+      logger.debug("Resuming at {}", desc);
+    }
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
new file mode 100644
index 0000000..96fed3a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/Injection.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The base class for all types of injections (currently, pause and exception).
+ */
+public abstract class Injection {
+
+  protected final String address;  // the address of the drillbit on which to inject
+  protected final int port; // user port of the drillbit; useful when there are multiple drillbits on same machine
+  private final Class<?> siteClass; // the class where the injection should happen
+  private final String desc; // description of the injection site; useful for multiple exception injections in a single class
+  private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
+  private final AtomicInteger nFire;  // the number of times to do the injection, after any skips; starts > 0
+
+  protected Injection(final String address, final int port, final String siteClass, final String desc,
+                      final int nSkip, final int nFire) throws InjectionConfigurationException {
+    if (desc == null || desc.isEmpty()) {
+      throw new InjectionConfigurationException("Injection desc is null or empty.");
+    }
+
+    if (nSkip < 0) {
+      throw new InjectionConfigurationException("Injection nSkip is not non-negative.");
+    }
+
+    if (nFire <= 0) {
+      throw new InjectionConfigurationException("Injection nFire is non-positive.");
+    }
+    try {
+      this.siteClass = Class.forName(siteClass);
+    } catch (ClassNotFoundException e) {
+      throw new InjectionConfigurationException("Injection siteClass not found.", e);
+    }
+
+    this.address = address;
+    this.port = port;
+    this.desc = desc;
+    this.nSkip = new AtomicInteger(nSkip);
+    this.nFire = new AtomicInteger(nFire);
+  }
+
+  /**
+   * This function checks if it is the right time for the injection to happen.
+   *
+   * @return if the injection should be injected now
+   */
+  protected final boolean injectNow() {
+    return nSkip.decrementAndGet() < 0 && nFire.decrementAndGet() >= 0;
+  }
+
+  public String getDesc() {
+    return desc;
+  }
+
+  public Class<?> getSiteClass() {
+    return siteClass;
+  }
+
+  // If the address is null, the injection must happen on every drillbit that reaches the specified site.
+  public final boolean isValidForBit(final DrillbitEndpoint endpoint) {
+    return address == null ||
+      (address.equals(endpoint.getAddress()) && port == endpoint.getUserPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
new file mode 100644
index 0000000..4fcb33a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionConfigurationException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+/**
+ * An Exception thrown when injection configuration is incorrect.
+ */
+public class InjectionConfigurationException extends Exception {
+  public InjectionConfigurationException(String message, Throwable throwable) {
+    super(message, throwable);
+  }
+
+  public InjectionConfigurationException(String message) {
+    super(message);
+  }
+
+  public InjectionConfigurationException(Throwable throwable) {
+    super(throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
index 9e19fdd..2bb9acc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
@@ -17,8 +17,13 @@
  */
 package org.apache.drill.exec.testing;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
 import com.google.common.base.Preconditions;
 
+import java.io.IOException;
+
 public class InjectionSite {
   private final Class<?> clazz;
   private final String desc;
@@ -31,14 +36,6 @@ public class InjectionSite {
     this.desc = desc;
   }
 
-  public Class<?> getSiteClass() {
-    return clazz;
-  }
-
-  public String getDesc() {
-    return desc;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (o == null) {
@@ -65,8 +62,35 @@ public class InjectionSite {
     return true;
   }
 
+  private static final String SEPARATOR = ",";
+
+  @Override
+  public String toString() {
+    return clazz.getName() + SEPARATOR + desc;
+  }
+
   @Override
   public int hashCode() {
     return (clazz.hashCode() + 13) ^ (1 - desc.hashCode());
   }
+
+  /**
+   * Key Deserializer for InjectionSite.
+   * Since JSON object keys must be strings, deserialize from a string.
+   */
+  public static class InjectionSiteKeyDeserializer extends KeyDeserializer {
+
+    @Override
+    public Object deserializeKey(final String key, final DeserializationContext context)
+      throws IOException, JsonProcessingException {
+      final String[] fields = key.split(SEPARATOR);
+      final Class<?> siteClass;
+      try {
+        siteClass = Class.forName(fields[0]);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Class " + fields[0] + " not found.", e);
+      }
+      return new InjectionSite(siteClass, fields[1]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
new file mode 100644
index 0000000..80d9790
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.slf4j.Logger;
+
+/**
+ * An injector that does not inject any controls.
+ */
+public final class NoOpControlsInjector extends ExecutionControlsInjector {
+
+  protected NoOpControlsInjector(final Class<?> clazz) {
+    super(clazz);
+  }
+
+  @Override
+  public ExecutionControlsInjector injectUnchecked(final ExecutionControls executionControls, final String desc) {
+    return this;
+  }
+
+  @Override
+  public <T extends Throwable> ExecutionControlsInjector injectChecked(
+    final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
+    return this;
+  }
+
+  @Override
+  public ExecutionControlsInjector injectPause(final ExecutionControls executionControls, final String desc,
+                                               final Logger logger) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/be8d9539/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
new file mode 100644
index 0000000..e5f9c9c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/PauseInjection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+/**
+ * Injection for a single pause. Specifies how long to pause. This class is used internally for tracking
+ * injected pauses; these pauses are specified via
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+ *
+ * TODO(DRILL-2697): Pause indefinitely until signalled, rather than for a specified time.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class PauseInjection extends Injection {
+
+  private final long millis;
+
+  @JsonCreator // ensures instances are created only through JSON
+  private PauseInjection(@JsonProperty("address") final String address,
+                         @JsonProperty("port") final int port,
+                         @JsonProperty("siteClass") final String siteClass,
+                         @JsonProperty("desc") final String desc,
+                         @JsonProperty("nSkip") final int nSkip,
+                         @JsonProperty("nFire") final int nFire,
+                         @JsonProperty("millis") final long millis) throws InjectionConfigurationException {
+    super(address, port, siteClass, desc, nSkip, nFire);
+    if (millis <= 0) {
+      throw new InjectionConfigurationException("Pause millis is non-positive.");
+    }
+    this.millis = millis;
+  }
+
+  public void pause() {
+    if (! injectNow()) {
+      return;
+    }
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      throw new DrillRuntimeException("Well, I should be sleeping.");
+    }
+  }
+}