You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/26 12:44:30 UTC

[06/11] drill git commit: DRILL-5730: Mock testing improvements and interface improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index f4a9825..aa067cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -145,7 +145,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     try {
       doWork();
     } catch (DrillException e) {
-      context.fail(e);
+      context.getExecutorState().fail(e);
       cleanup();
       return IterOutcome.STOP;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index c212593..9cde1a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -201,7 +201,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         // where it would have been passed to context.fail()
         // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
         // right away, this will also make sure we collect any additional exception we may get while cleaning up
-        context.fail(e);
+        context.getExecutorState().fail(e);
       }
     }
   }
@@ -483,7 +483,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         mSorter.sort(this.container);
 
         // sort may have prematurely exited due to should continue returning false.
-        if (!context.shouldContinue()) {
+        if (!context.getExecutorState().shouldContinue()) {
           return IterOutcome.STOP;
         }
 
@@ -522,12 +522,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     } catch (SchemaChangeException ex) {
       kill(false);
-      context.fail(UserException.unsupportedError(ex)
+      context.getExecutorState().fail(UserException.unsupportedError(ex)
         .message("Sort doesn't currently support sorts with changing schemas").build(logger));
       return IterOutcome.STOP;
     } catch(ClassTransformationException | IOException ex) {
       kill(false);
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return IterOutcome.STOP;
     } catch (UnsupportedOperationException e) {
       throw new RuntimeException(e);
@@ -650,7 +650,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         try {
           Thread.sleep(waitTime * 1000);
         } catch(final InterruptedException e) {
-          if (!context.shouldContinue()) {
+          if (!context.getExecutorState().shouldContinue()) {
             throw e;
           }
         }
@@ -688,11 +688,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException {
-    return createNewMSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+    return createNewMSorter(context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
   }
 
-  private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
-          throws ClassTransformationException, IOException, SchemaChangeException{
+  private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet
+    rightMapping)
+          throws ClassTransformationException, IOException, SchemaChangeException {
     CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions());
     ClassGenerator<MSorter> g = cg.getRoot();
     g.setMappingSet(mainMapping);
@@ -735,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch)
-          throws ClassTransformationException, IOException, SchemaChangeException{
+          throws ClassTransformationException, IOException, SchemaChangeException {
     CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions());
     cg.plainJavaCapable(true); // This class can generate plain-old Java.
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index e579fc2..6a60196 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -22,19 +22,19 @@ import java.util.List;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
 
 import com.google.common.base.Preconditions;
+import org.apache.drill.exec.server.options.OptionManager;
 
 public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
 
   @Override
-  public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+  public AbstractRecordBatch<ExternalSort> getBatch(ExecutorFragmentContext context, ExternalSort config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 2f3d2f6..9b69170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -130,7 +130,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
     while (runStarts.size() > 1) {
 
       // check if we're cancelled/failed frequently
-      if (!context.shouldContinue()) {
+      if (!context.getExecutorState().shouldContinue()) {
         return;
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
index 733ea5e..4dbee3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 public interface SingleBatchSorter {
-  public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
+  public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
   public void sort(SelectionVector2 vector2) throws SchemaChangeException;
 
   public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION =

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 0f4680d..4a1af4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
   private SelectionVector2 vector2;
 
   @Override
-  public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
+  public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
     Preconditions.checkNotNull(vector2);
     this.vector2 = vector2;
     try {
@@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
     }
   }
 
-  public abstract void doSetup(@Named("context") FragmentContextInterface context,
+  public abstract void doSetup(@Named("context") FragmentContext context,
                                @Named("incoming") VectorAccessible incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9150fe3..23e66a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -135,8 +135,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
  * into new batches of a size determined by that operator.</li>
  * <li>A series of batches, without a selection vector, if the sort spills to
  * disk. In this case, the downstream operator will still be a selection vector
- * remover, but there is nothing for that operator to remove. Each batch is
- * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
+ * remover, but there is nothing for that operator to remove.
  * </ul>
  * Note that, even in the in-memory sort case, this operator could do the copying
  * to eliminate the extra selection vector remover. That is left as an exercise
@@ -375,7 +374,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // sort may have prematurely exited due to shouldContinue() returning false.
 
-    if (! context.shouldContinue()) {
+    if (!context.getExecutorState().shouldContinue()) {
       sortState = SortState.DONE;
       return IterOutcome.STOP;
     }
@@ -440,8 +439,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   /**
    * Handle a new schema from upstream. The ESB is quite limited in its ability
    * to handle schema changes.
-   *
-   * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
    */
 
   private void setupSchema()  {
@@ -482,6 +479,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    * <p>
    * Some Drill code ends up calling close() two or more times. The code
    * here protects itself from these undesirable semantics.
+   * </p>
    */
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 698e32f..625d360 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
    */
 
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
-  private FragmentContextInterface context;
+  private FragmentContext context;
 
   /**
    * Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   private int desiredRecordBatchCount;
 
   @Override
-  public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4,
+  public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
                     final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
@@ -162,7 +162,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
       final int totalCount = this.vector4.getTotalCount();
 
       // check if we're cancelled/failed recently
-      if (!context.shouldContinue()) {
+      if (!context.getExecutorState().shouldContinue()) {
         return; }
 
       int outIndex = 0;
@@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
     }
   }
 
-  public abstract void doSetup(@Named("context") FragmentContextInterface context,
+  public abstract void doSetup(@Named("context") FragmentContext context,
                                @Named("incoming") VectorContainer incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 428f6f8..8eae8b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  */
 
 public interface MSorter {
-  public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4,
+  public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4,
                     VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
   public void sort();
   public SelectionVector4 getSV4();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index eb90614..015d078 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -103,7 +103,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   public final IterOutcome next(final RecordBatch b) {
-    if(!context.shouldContinue()) {
+    if(!context.getExecutorState().shouldContinue()) {
       return IterOutcome.STOP;
     }
     return next(0, b);
@@ -113,7 +113,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     IterOutcome next = null;
     stats.stopProcessing();
     try{
-      if (!context.shouldContinue()) {
+      if (!context.getExecutorState().shouldContinue()) {
         return IterOutcome.STOP;
       }
       next = b.next();
@@ -147,7 +147,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
               return IterOutcome.NONE;
             case OUT_OF_MEMORY:
               // because we don't support schema changes, it is safe to fail the query right away
-              context.fail(UserException.memoryError()
+              context.getExecutorState().fail(UserException.memoryError()
                 .build(logger));
               // FALL-THROUGH
             case STOP:

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 4a9828c..07312a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -85,7 +85,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
-        context.fail(ex);
+        context.getExecutorState().fail(ex);
         return IterOutcome.STOP;
       } finally {
         stats.stopSetup();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index e7c0ee5..381071f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -21,14 +21,10 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.concurrent.Semaphore;
 
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData.RpcType;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.FutureBitCommand;
 import org.apache.drill.exec.rpc.ListeningCommand;
-import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -96,23 +92,6 @@ public class DataTunnel {
     }
   }
 
-  // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture?
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context);
-    try{
-      sendingSemaphore.acquire();
-      manager.runCommand(b);
-    }catch(final InterruptedException e){
-      b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
-
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    }
-    return b.getFuture();
-  }
-
-
   private class ThrottlingOutcomeListener implements RpcOutcomeListener<Ack>{
     RpcOutcomeListener<Ack> inner;
 
@@ -166,26 +145,4 @@ public class DataTunnel {
       super.connectionFailed(type, t);
     }
   }
-
-  private class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
-    final FragmentWritableBatch batch;
-    final FragmentContext context;
-
-    public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) {
-      super();
-      this.batch = batch;
-      this.context = context;
-    }
-
-    @Override
-    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) {
-      connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
-    }
-
-    @Override
-    public String toString() {
-      return "SendBatch [batch.header=" + batch.getHeader() + "]";
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 1f9c81c..5a0e14d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -252,6 +252,14 @@ public class BootStrapContext implements AutoCloseable {
       }
     }
 
+    if (scanExecutor != null) {
+      scanExecutor.shutdown();
+    }
+
+    if (scanDecodeExecutor != null) {
+      scanDecodeExecutor.shutdownNow();
+    }
+
     try {
       AutoCloseables.close(allocator, authProvider);
       shutdown(loop);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
index 90f5623..f0b7243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java
@@ -19,11 +19,12 @@ package org.apache.drill.exec.server.options;
 
 import com.google.common.collect.Maps;
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.ops.FragmentContextImpl;
 
 import java.util.Map;
 
 /**
- * {@link OptionManager} that holds options within {@link org.apache.drill.exec.ops.FragmentContext}.
+ * {@link OptionManager} that holds options within {@link FragmentContextImpl}.
  */
 public class FragmentOptionManager extends InMemoryOptionManager {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index f9d44cc..62d46d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContextInterface;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -52,15 +51,6 @@ public class ColumnExplorer {
    * between actual table columns, partition columns and implicit file columns.
    * Also populates map with implicit columns names as keys and their values
    */
-  public ColumnExplorer(FragmentContextInterface context, List<SchemaPath> columns) {
-    this(context.getOptions(), columns);
-  }
-
-  /**
-   * Helper class that encapsulates logic for sorting out columns
-   * between actual table columns, partition columns and implicit file columns.
-   * Also populates map with implicit columns names as keys and their values
-   */
   public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
     this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index f81f74e..d407ec3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -127,7 +127,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
 
   @SuppressWarnings("resource")
   CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
-    final ColumnExplorer columnExplorer = new ColumnExplorer(context, scan.getColumns());
+    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
 
     if (!columnExplorer.isStarQuery()) {
       scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
       }
 
-    return new ScanBatch(scan, context, oContext, readers, implicitColumns);
+    return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
   public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
index f9dfd8b..4e71795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
-public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
-
+public class EasyReaderBatchCreator implements BatchCreator<EasySubScan> {
   @Override
-  public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasySubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children == null || children.isEmpty();
     return config.getFormatPlugin().getReaderBatch(context, config);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
index bfb4188..f3988b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
-
   @Override
-  public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasyWriter config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index 8442c32..c7b8cdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -21,16 +21,14 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class);
-
   @Override
-  public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, DirectSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 4abf7a5..372a91d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -116,7 +116,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
     this.enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
     this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
-    this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
     this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
     this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
     setColumns(columns);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index a9a30e4..8209252 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -86,7 +86,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     if (context.getOptions().getOption(ExecConstants.ENABLE_NEW_TEXT_READER_KEY).bool_val == true) {
       TextParsingSettings settings = new TextParsingSettings();
       settings.set((TextFormatConfig)formatConfig);
-      return new CompliantTextRecordReader(split, dfs, context, settings, columns);
+      return new CompliantTextRecordReader(split, dfs, settings, columns);
     } else {
       char delim = ((TextFormatConfig)formatConfig).getFieldDelimiter();
       return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 7009584..2eafdd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -67,7 +66,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
   // operator context for OutputMutator
   private OperatorContext oContext;
 
-  public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, FragmentContext context, TextParsingSettings settings, List<SchemaPath> columns) {
+  public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, TextParsingSettings settings, List<SchemaPath> columns) {
     this.split = split;
     this.settings = settings;
     this.dfs = dfs;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index ce05543..76e3ae2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -21,17 +21,15 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
-public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class);
-
+public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan> {
   @Override
-  public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
     RecordReader rr = config.getTable().getRecordReader(context.getFullRootSchema(), config.getFilter(), context.getOptions());
     return new ScanBatch(config, context, Collections.singletonList(rr));

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
index ac9cb6a..ed01376 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -58,7 +57,7 @@ public class ExtendedMockRecordReader extends AbstractRecordReader {
   private final MockScanEntry config;
   private final ColumnDef fields[];
 
-  public ExtendedMockRecordReader(FragmentContext context, MockScanEntry config) {
+  public ExtendedMockRecordReader(MockScanEntry config) {
     this.config = config;
 
     fields = buildColumnDefs();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 108a621..5c9a87a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -35,8 +35,6 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class MockRecordReader extends AbstractRecordReader {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
   private final MockScanEntry config;
   private final FragmentContext context;
   private ValueVector[] valueVectors;
@@ -62,7 +60,7 @@ public class MockRecordReader extends AbstractRecordReader {
     return x;
   }
 
-  private MaterializedField getVector(String name, MajorType type, int length) {
+  private MaterializedField getVector(String name, MajorType type) {
     assert context != null : "Context shouldn't be null.";
     final MaterializedField f = MaterializedField.create(name, type);
     return f;
@@ -80,7 +78,7 @@ public class MockRecordReader extends AbstractRecordReader {
 
       for (int i = 0; i < config.getTypes().length; i++) {
         final MajorType type = config.getTypes()[i].getMajorType();
-        final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+        final MaterializedField field = getVector(config.getTypes()[i].getName(), type);
         final Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
         valueVectors[i] = output.addField(field, vvClass);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 8f89eff..01a102d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.mock;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -32,17 +32,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
-
   @Override
-  public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     final List<MockScanEntry> entries = config.getReadEntries();
     final List<RecordReader> readers = Lists.newArrayList();
     for(final MockTableDef.MockScanEntry e : entries) {
       if ( e.isExtended( ) ) {
-        readers.add(new ExtendedMockRecordReader(context, e));
+        readers.add(new ExtendedMockRecordReader(e));
       } else {
         readers.add(new MockRecordReader(context, e));
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
index bc4be13..337f220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
 import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
 import org.apache.drill.exec.expr.stat.RangeExprEvaluator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -74,7 +74,7 @@ public class ParquetRGFilterEvaluator {
 
 
   public static boolean canDrop(LogicalExpression expr, Map<SchemaPath, ColumnStatistics> columnStatisticsMap,
-      long rowCount, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry) {
+      long rowCount, UdfUtilities udfUtilities, FunctionLookupContext functionImplementationRegistry) {
     ErrorCollector errorCollector = new ErrorCollectorImpl();
     LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
         expr, columnStatisticsMap, errorCollector, functionImplementationRegistry);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index bb0b65f..62948b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -27,7 +27,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -49,7 +49,6 @@ import org.apache.parquet.schema.Type;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-
 public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
 
@@ -59,12 +58,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
 
   @SuppressWarnings("resource")
   @Override
-  public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
+  public ScanBatch getBatch(ExecutorFragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     OperatorContext oContext = context.newOperatorContext(rowGroupScan);
 
-    final ColumnExplorer columnExplorer = new ColumnExplorer(context, rowGroupScan.getColumns());
+    final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
 
     if (!columnExplorer.isStarQuery()) {
       rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(),
@@ -154,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
     }
 
-    return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
+    return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
   private static boolean isComplex(ParquetMetadata footer) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
index 79c5709..cfb65d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.parquet;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class);
 
   @Override
-  public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
+  public WriterRecordBatch getBatch(ExecutorFragmentContext context, ParquetWriter config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index ad1c4bf..04e76b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -38,8 +38,7 @@ public class ParquetToDrillTypeConverter {
   }
 
   private static TypeProtos.MinorType getMinorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
-                                                   ConvertedType convertedType, int precision, int scale,
-      OptionManager options) {
+                                                   ConvertedType convertedType, int precision, OptionManager options) {
 
 
     switch (primitiveTypeName) {
@@ -139,7 +138,7 @@ public class ParquetToDrillTypeConverter {
   public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
       TypeProtos.DataMode mode, ConvertedType convertedType, int precision, int scale,
       OptionManager options) {
-    MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, scale, options);
+    MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, options);
     TypeProtos.MajorType.Builder typeBuilder = TypeProtos.MajorType.newBuilder().setMinorType(minorType).setMode(mode);
 
     if (CoreDecimalUtility.isDecimalType(minorType)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
index 6294655..e302e7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java
@@ -22,8 +22,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetToDrillTypeConverter;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -37,7 +35,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.joda.time.DateTimeConstants;
-import org.joda.time.DateTimeUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
index 15a56a2..554db10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
 import java.util.TimeZone;
 
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
 import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnectionConfig;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -45,13 +45,13 @@ public class BitToUserConnectionIterator implements Iterator<Object> {
   private String queryingUsername;
   private boolean isAdmin;
 
-  public BitToUserConnectionIterator(FragmentContext context) {
+  public BitToUserConnectionIterator(ExecutorFragmentContext context) {
     queryingUsername = context.getQueryUserName();
     isAdmin = hasAdminPrivileges(context);
     itr = iterateConnectionInfo(context);
   }
 
-  private boolean hasAdminPrivileges(FragmentContext context) {
+  private boolean hasAdminPrivileges(ExecutorFragmentContext context) {
     OptionManager options = context.getOptions();
     if (context.isUserAuthenticationEnabled() &&
         !ImpersonationUtil.hasAdminPrivileges(
@@ -65,11 +65,10 @@ public class BitToUserConnectionIterator implements Iterator<Object> {
     return true;
   }
 
-  private Iterator<ConnectionInfo> iterateConnectionInfo(FragmentContext context) {
-    Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections =
-        context.getDrillbitContext().getUserConnections();
+  private Iterator<ConnectionInfo> iterateConnectionInfo(ExecutorFragmentContext context) {
+    Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections = context.getUserConnections();
 
-    String hostname = context.getIdentity().getAddress();
+    String hostname = context.getEndpoint().getAddress();
     List<ConnectionInfo> connectionInfos = new LinkedList<ConnectionInfo>();
 
     for (Entry<BitToUserConnection, BitToUserConnectionConfig> connection : activeConnections) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index dc4e7c7..08d3706 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys;
 
 import java.util.Iterator;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 public class DrillbitIterator implements Iterator<Object> {
@@ -28,9 +28,9 @@ public class DrillbitIterator implements Iterator<Object> {
   private Iterator<DrillbitEndpoint> endpoints;
   private DrillbitEndpoint current;
 
-  public DrillbitIterator(FragmentContext c) {
-    this.endpoints = c.getDrillbitContext().getAvailableBits().iterator();
-    this.current = c.getIdentity();
+  public DrillbitIterator(ExecutorFragmentContext c) {
+    this.endpoints = c.getBits().iterator();
+    this.current = c.getEndpoint();
   }
 
   public static class DrillbitInstance {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
index e624ada..8d437a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -25,15 +25,15 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 public class MemoryIterator implements Iterator<Object> {
 
   private boolean beforeFirst = true;
-  private final FragmentContext context;
+  private final ExecutorFragmentContext context;
 
-  public MemoryIterator(final FragmentContext context) {
+  public MemoryIterator(final ExecutorFragmentContext context) {
     this.context = context;
   }
 
@@ -50,7 +50,7 @@ public class MemoryIterator implements Iterator<Object> {
     beforeFirst = false;
     final MemoryInfo memoryInfo = new MemoryInfo();
 
-    final DrillbitEndpoint endpoint = context.getIdentity();
+    final DrillbitEndpoint endpoint = context.getEndpoint();
     memoryInfo.hostname = endpoint.getAddress();
     memoryInfo.user_port = endpoint.getUserPort();
 
@@ -62,7 +62,7 @@ public class MemoryIterator implements Iterator<Object> {
     memoryInfo.jvm_direct_current = directBean.getMemoryUsed();
 
 
-    memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
+    memoryInfo.direct_current = context.getAllocator().getAllocatedMemory();
     memoryInfo.direct_max = DrillConfig.getMaxDirectMemory();
     return memoryInfo;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
index e9282c3..eef6604 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.sys;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 
@@ -37,7 +37,7 @@ public class ProfileInfoIterator extends ProfileIterator {
 
   private final Iterator<ProfileInfo> itr;
 
-  public ProfileInfoIterator(FragmentContext context) {
+  public ProfileInfoIterator(ExecutorFragmentContext context) {
     super(context);
     itr = iterateProfileInfo();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
index f1d2075..6112eb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -37,17 +37,15 @@ public abstract class ProfileIterator implements Iterator<Object> {
   protected final String queryingUsername;
   protected final boolean isAdmin;
 
-  public ProfileIterator(FragmentContext context) {
+  public ProfileIterator(ExecutorFragmentContext context) {
     //Grab profile Store Context
-    profileStoreContext = context
-        .getDrillbitContext()
-        .getProfileStoreContext();
+    profileStoreContext = context.getProfileStoreContext();
 
     queryingUsername = context.getQueryUserName();
     isAdmin = hasAdminPrivileges(context);
   }
 
-  protected boolean hasAdminPrivileges(FragmentContext context) {
+  protected boolean hasAdminPrivileges(ExecutorFragmentContext context) {
     OptionManager options = context.getOptions();
     if (context.isUserAuthenticationEnabled() &&
         !ImpersonationUtil.hasAdminPrivileges(

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
index fcc2921..67f1165 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java
@@ -23,7 +23,7 @@ import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.serialization.InstanceSerializer;
@@ -40,7 +40,7 @@ public class ProfileJsonIterator extends ProfileIterator {
   private final InstanceSerializer<QueryProfile> profileSerializer;
   private final Iterator<ProfileJson> itr;
 
-  public ProfileJsonIterator(FragmentContext context) {
+  public ProfileJsonIterator(ExecutorFragmentContext context) {
     super(context);
     //Holding a serializer (for JSON extract)
     profileSerializer = profileStoreContext.

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
index 034f70c..a49ff9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys;
 
 import java.util.Iterator;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
 
 /**
@@ -33,90 +33,88 @@ import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper;
 public enum SystemTable {
   OPTION("options", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_PUBLIC);
     }
   },
 
   OPTION_VAL("options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new ExtendedOptionIterator(context, false);
     }
   },
 
   INTERNAL_OPTIONS("internal_options", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_INTERNAL);
     }
   },
 
   INTERNAL_OPTIONS_VAL("internal_options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new ExtendedOptionIterator(context, true);
     }
   },
 
   BOOT("boot", false, OptionValueWrapper.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new OptionIterator(context, OptionIterator.Mode.BOOT);
     }
   },
 
   DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new DrillbitIterator(context);
     }
   },
 
   VERSION("version", false, VersionIterator.VersionInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new VersionIterator();
     }
   },
 
   MEMORY("memory", true, MemoryIterator.MemoryInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new MemoryIterator(context);
     }
   },
 
   CONNECTIONS("connections", true, BitToUserConnectionIterator.ConnectionInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new BitToUserConnectionIterator(context);
     }
   },
 
   PROFILES("profiles", false, ProfileInfoIterator.ProfileInfo.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new ProfileInfoIterator(context);
     }
   },
 
   PROFILES_JSON("profiles_json", false, ProfileJsonIterator.ProfileJson.class) {
     @Override
-    public Iterator<Object> getIterator(final FragmentContext context) {
+    public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new ProfileJsonIterator(context);
     }
   },
 
   THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) {
     @Override
-  public Iterator<Object> getIterator(final FragmentContext context) {
+  public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
       return new ThreadsIterator(context);
     }
   };
 
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTable.class);
-
   private final String tableName;
   private final boolean distributed;
   private final Class<?> pojoClass;
@@ -127,7 +125,7 @@ public enum SystemTable {
     this.pojoClass = pojoClass;
   }
 
-  public Iterator<Object> getIterator(final FragmentContext context) {
+  public Iterator<Object> getIterator(final ExecutorFragmentContext context) {
     throw new UnsupportedOperationException(tableName + " must override this method.");
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index ab87a4a..c0ef0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -39,8 +39,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
-  public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan,
-                              final List<RecordBatch> children)
+  public ScanBatch getBatch(final ExecutorFragmentContext context, final SystemTableScan scan,
+                            final List<RecordBatch> children)
     throws ExecutionSetupException {
     final SystemTable table = scan.getTable();
     final Iterator<Object> iterator = table.getIterator(context);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
index 681119d..567d299 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
@@ -21,15 +21,15 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.util.Iterator;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 public class ThreadsIterator implements Iterator<Object> {
 
   private boolean beforeFirst = true;
-  private final FragmentContext context;
+  private final ExecutorFragmentContext context;
 
-  public ThreadsIterator(final FragmentContext context) {
+  public ThreadsIterator(final ExecutorFragmentContext context) {
     this.context = context;
   }
 
@@ -46,7 +46,7 @@ public class ThreadsIterator implements Iterator<Object> {
     beforeFirst = false;
     final ThreadsInfo threadsInfo = new ThreadsInfo();
 
-    final DrillbitEndpoint endpoint = context.getIdentity();
+    final DrillbitEndpoint endpoint = context.getEndpoint();
     threadsInfo.hostname = endpoint.getAddress();
     threadsInfo.user_port = endpoint.getUserPort();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
index c7bdbad..cfcb9e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.testing;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.slf4j.Logger;
 
 public interface ControlsInjector {
@@ -43,19 +43,6 @@ public interface ControlsInjector {
   void injectUnchecked(ExecutionControls executionControls, String desc);
 
   /**
-   * Inject (throw) an unchecked exception at this point, if the fragmentContext is not null,
-   * an injection is specified, and it is time for it to be thrown.
-   * <p/>
-   * <p>Implementors use this in their code at a site where they want to simulate an exception
-   * during testing.
-   *
-   * @param fragmentContext   fragmentContext used to retrieve the controls, can be null
-   * @param desc              the site description
-   *                          throws the exception specified by the injection, if it is time
-   */
-  void injectUnchecked(FragmentContext fragmentContext, String desc);
-
-  /**
    * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
    * for it to be thrown.
    * <p/>

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 82646ea..78c510a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -131,6 +132,11 @@ public final class ExecutionControls {
 
   private final DrillbitEndpoint endpoint; // the current endpoint
 
+  @VisibleForTesting
+  public ExecutionControls(final OptionManager options) {
+    this(options, null);
+  }
+
   public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) {
     this.endpoint = endpoint;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
index 3a74fee..e5234f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java
@@ -19,16 +19,15 @@ package org.apache.drill.exec.testing;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 
 /**
  * Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions
  * or inject pauses for testing should have it's own private static instance of an injector (similar to the use
- * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or
- * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and
- * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use.
+ * of loggers). Injection site either use {@link FragmentContextImpl} or
+ * {@link org.apache.drill.exec.ops.QueryContext}.
  * See {@link ControlsInjector} for documentation.
  */
 public class ExecutionControlsInjector implements ControlsInjector {
@@ -60,13 +59,6 @@ public class ExecutionControlsInjector implements ControlsInjector {
   }
 
   @Override
-  public void injectUnchecked(final FragmentContext fragmentContext, final String desc) {
-    if (fragmentContext != null) {
-      injectUnchecked(fragmentContext.getExecutionControls(), desc);
-    }
-  }
-
-  @Override
   public <T extends Throwable> void injectChecked(final ExecutionControls executionControls, final String desc,
                                                   final Class<T> exceptionClass) throws T {
     Preconditions.checkNotNull(executionControls);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
index b8ec44d..ae853d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.testing;
 
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.slf4j.Logger;
 
 /**
@@ -48,10 +48,6 @@ public final class NoOpControlsInjector implements ControlsInjector {
   }
 
   @Override
-  public void injectUnchecked(final FragmentContext fragmentContext, final String desc) {
-  }
-
-  @Override
   public <T extends Throwable> void injectChecked(
     final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T {
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 8a3e5b6..793cc01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -29,9 +29,7 @@ import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
 
 import com.google.common.base.Preconditions;
 
-public abstract class AbstractDataCollector implements DataCollector{
-
-  // private final List<MinorFragmentEndpoint> incoming;
+public abstract class AbstractDataCollector implements DataCollector {
   private final int oppositeMajorFragmentId;
   private final AtomicIntegerArray remainders;
   private final AtomicInteger remainingRequired;
@@ -42,7 +40,6 @@ public abstract class AbstractDataCollector implements DataCollector{
 
   /**
    * @param parentAccounter
-   * @param receiver
    * @param numBuffers Number of RawBatchBuffer inputs required to store the incoming data
    * @param bufferCapacity Capacity of each RawBatchBuffer.
    * @param context
@@ -74,7 +71,7 @@ public abstract class AbstractDataCollector implements DataCollector{
       if (spooling) {
         buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i);
       } else {
-        buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId());
+        buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index f15a3e6..382ab1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -35,13 +35,13 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
   }
 
   protected interface BufferQueue<T> {
-    public void addOomBatch(RawFragmentBatch batch);
-    public RawFragmentBatch poll() throws IOException;
-    public RawFragmentBatch take() throws IOException, InterruptedException;
-    public boolean checkForOutOfMemory();
-    public int size();
-    public boolean isEmpty();
-    public void add(T obj);
+    void addOomBatch(RawFragmentBatch batch);
+    RawFragmentBatch poll() throws IOException;
+    RawFragmentBatch take() throws IOException, InterruptedException;
+    boolean checkForOutOfMemory();
+    int size();
+    boolean isEmpty();
+    void add(T obj);
   }
 
   protected BufferQueue<T> bufferQueue;
@@ -74,7 +74,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
 
     // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
     // ensure that tests run.
-    if (context != null && !context.shouldContinue()) {
+    if (context != null && !context.getExecutorState().shouldContinue()) {
       this.kill(context);
     }
 
@@ -102,14 +102,14 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
 //  ## Add assertion that all acks have been sent. TODO
   @Override
   public void close() {
-    if (!isTerminated() && context.shouldContinue()) {
+    if (!isTerminated() && context.getExecutorState().shouldContinue()) {
       final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
       throw  new IllegalStateException(msg);
     }
 
     if (!bufferQueue.isEmpty()) {
-      if (context.shouldContinue()) {
-        context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+      if (context.getExecutorState().shouldContinue()) {
+        context.getExecutorState().fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", bufferQueue.size());
       }
       clearBufferWithBody();
@@ -133,7 +133,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
         batch = bufferQueue.poll();
         assertAckSent(batch);
       } catch (IOException e) {
-        context.fail(e);
+        context.getExecutorState().fail(e);
         continue;
       }
       if (batch.getBody() != null) {
@@ -172,7 +172,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
     } catch (final InterruptedException e) {
 
       // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
-      if (!context.shouldContinue()) {
+      if (!context.getExecutorState().shouldContinue()) {
         kill(context);
       } else {
         throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
@@ -184,7 +184,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
       return null;
     }
 
-    if (context.isOverMemoryLimit()) {
+    if (context.getAllocator().isOverLimit()) {
       outOfMemory.set(true);
     }