You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/26 12:44:32 UTC

[08/11] drill git commit: DRILL-5730: Mock testing improvements and interface improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
new file mode 100644
index 0000000..b3c84bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
+import org.apache.drill.exec.server.options.FragmentOptionManager;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.PartitionExplorer;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Contextual objects required for execution of a particular fragment.
+ * This is the implementation; use <tt>FragmentContext</tt>
+ * in code to allow tests to use test-time implementations.
+ */
+public class FragmentContextImpl extends BaseFragmentContext implements ExecutorFragmentContext {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContextImpl.class);
+
+  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
+  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
+
+  private final DrillbitContext context;
+  private final UserClientConnection connection; // is null if this context is for non-root fragment
+  private final QueryContext queryContext; // is null if this context is for non-root fragment
+  private final FragmentStats stats;
+  private final BufferAllocator allocator;
+  private final PlanFragment fragment;
+  private final ContextInformation contextInformation;
+  private IncomingBuffers buffers;
+  private final OptionManager fragmentOptions;
+  private final BufferManager bufferManager;
+  private ExecutorState executorState;
+  private final ExecutionControls executionControls;
+
+  private final SendingAccountor sendingAccountor = new SendingAccountor();
+  private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
+    @Override
+    public void accept(final RpcException e) {
+      fail(e);
+    }
+
+    @Override
+    public void interrupt(final InterruptedException e) {
+      if (executorState.shouldContinue()) {
+        logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e);
+        fail(e);
+      }
+    }
+  };
+
+  private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+  private final AccountingUserConnection accountingUserConnection;
+  /** Stores constants and their holders by type */
+  private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
+
+  /**
+   * Create a FragmentContext instance for non-root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
+  public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment,
+                             final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, null, funcRegistry);
+  }
+
+  /**
+   * Create a FragmentContext instance for root fragment.
+   *
+   * @param dbContext DrillbitContext.
+   * @param fragment Fragment implementation.
+   * @param queryContext QueryContext.
+   * @param connection UserClientConnection.
+   * @param funcRegistry FunctionImplementationRegistry.
+   * @throws ExecutionSetupException
+   */
+  public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
+                             final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
+    throws ExecutionSetupException {
+    super(funcRegistry);
+    this.context = dbContext;
+    this.queryContext = queryContext;
+    this.connection = connection;
+    this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
+    this.fragment = fragment;
+    contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext());
+
+    logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
+    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+
+    final OptionList list;
+    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
+      list = new OptionList();
+    } else {
+      try {
+        list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
+      } catch (final Exception e) {
+        throw new ExecutionSetupException("Failure while reading plan options.", e);
+      }
+    }
+    fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+
+    executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());
+
+    // Add the fragment context to the root allocator.
+    // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
+    try {
+      allocator = context.getAllocator().newChildAllocator(
+          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
+          fragment.getMemInitial(),
+          fragment.getMemMax());
+      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+    } catch (final OutOfMemoryException e) {
+      throw UserException.memoryError(e)
+        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
+        .build(logger);
+    } catch(final Throwable e) {
+      throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
+    }
+
+    stats = new FragmentStats(allocator, fragment.getAssignment());
+    bufferManager = new BufferManagerImpl(this.allocator);
+    constantValueHolderCache = Maps.newHashMap();
+  }
+
+  /**
+   * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
+   * the long list of test files.
+   */
+  public FragmentContextImpl(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
+                             FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
+    this(dbContext, fragment, null, connection, funcRegistry);
+  }
+
+  @Override
+  public OptionManager getOptions() {
+    return fragmentOptions;
+  }
+
+  @Override
+  public PhysicalPlanReader getPlanReader() {
+    return context.getPlanReader();
+  }
+
+  @Override
+  public ClusterCoordinator getClusterCoordinator() {
+    return context.getClusterCoordinator();
+  }
+
+  @Override
+  public void setBuffers(final IncomingBuffers buffers) {
+    Preconditions.checkArgument(this.buffers == null, "Can only set buffers once.");
+    this.buffers = buffers;
+  }
+  public QueryProfileStoreContext getProfileStoreContext() {
+    return context.getProfileStoreContext();
+  }
+
+  @Override
+  public Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections() {
+    return context.getUserConnections();
+  }
+
+  public void setExecutorState(final ExecutorState executorState) {
+    Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once.");
+    this.executorState = executorState;
+  }
+
+  public void fail(final Throwable cause) {
+    executorState.fail(cause);
+  }
+
+  public SchemaPlus getFullRootSchema() {
+    if (queryContext == null) {
+      fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
+          "This is a non-root fragment."));
+      return null;
+    }
+
+    final boolean isImpersonationEnabled = isImpersonationEnabled();
+    // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for
+    // InfoSchema purpose we want to show tables the user has permissions to list or query. If  impersonation is
+    // disabled view the schema as Drillbit process user and throw authorization errors to client.
+    SchemaConfig schemaConfig = SchemaConfig
+        .newBuilder(
+            isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(),
+            queryContext)
+        .setIgnoreAuthErrors(isImpersonationEnabled)
+        .build();
+
+    return queryContext.getFullRootSchema(schemaConfig);
+  }
+
+  public FragmentStats getStats() {
+    return stats;
+  }
+
+  @Override
+  public Collection<DrillbitEndpoint> getBits() {
+    return context.getBits();
+  }
+
+  @Override
+  public ContextInformation getContextInformation() {
+    return contextInformation;
+  }
+
+  @Override
+  public DrillbitEndpoint getForemanEndpoint() {
+    return fragment.getForeman();
+  }
+
+  @Override
+  public DrillbitEndpoint getEndpoint() {
+    return context.getEndpoint();
+  }
+
+  @Override
+  public Controller getController() {
+    return context.getController();
+  }
+
+  @Override
+  public OperatorCreatorRegistry getOperatorCreatorRegistry() {
+    return context.getOperatorCreatorRegistry();
+  }
+
+  @Override
+  public ExecutorService getScanDecodeExecutor() {
+    return context.getScanDecodeExecutor();
+  }
+
+  @Override
+  public ExecutorService getScanExecutor() {
+    return context.getScanExecutor();
+  }
+
+  /**
+   * The FragmentHandle for this Fragment
+   * @return FragmentHandle
+   */
+  public FragmentHandle getHandle() {
+    return fragment.getHandle();
+  }
+
+  public String getFragIdString() {
+    final FragmentHandle handle = getHandle();
+    final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
+    return frag;
+  }
+
+  @Override
+  public boolean isUserAuthenticationEnabled() {
+    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+    // no config
+    if (getConfig() == null) {
+      return false;
+    }
+
+    return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
+  }
+
+  /**
+   * Get this fragment's allocator.
+   * @return the allocator
+   */
+  @Deprecated
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
+    }
+    return allocator;
+  }
+
+  @Override
+  public BufferAllocator getNewChildAllocator(final String operatorName,
+      final int operatorId,
+      final long initialReservation,
+      final long maximumReservation) throws OutOfMemoryException {
+    return allocator.newChildAllocator(
+        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
+        initialReservation,
+        maximumReservation
+        );
+  }
+
+  public boolean isOverMemoryLimit() {
+    return allocator.isOverLimit();
+  }
+
+  @Override
+  public CodeCompiler getCompiler() {
+    return context.getCompiler();
+  }
+
+  @Override
+  public AccountingUserConnection getUserDataTunnel() {
+    Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel");
+    return accountingUserConnection;
+  }
+
+  @Override
+  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) {
+    AccountingDataTunnel tunnel = tunnels.get(endpoint);
+    if (tunnel == null) {
+      tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler);
+      tunnels.put(endpoint, tunnel);
+    }
+    return tunnel;
+  }
+
+  public IncomingBuffers getBuffers() {
+    return buffers;
+  }
+
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
+      throws OutOfMemoryException {
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
+    contexts.add(context);
+    return context;
+  }
+
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig)
+      throws OutOfMemoryException {
+    OperatorContextImpl context = new OperatorContextImpl(popConfig, this);
+    contexts.add(context);
+    return context;
+  }
+
+  @Override
+  public DrillConfig getConfig() {
+    return context.getConfig();
+  }
+
+  @Override
+  public ExecutorState getExecutorState() {
+    return executorState;
+  }
+
+  @Override
+  public ExecutionControls getExecutionControls() {
+    return executionControls;
+  }
+
+  public String getQueryUserName() {
+    return fragment.getCredentials().getUserName();
+  }
+
+  public boolean isImpersonationEnabled() {
+    // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is
+    // no config
+    if (getConfig() == null) {
+      return false;
+    }
+
+    return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED);
+  }
+
+  @Override
+  public void close() {
+    waitForSendComplete();
+
+    // close operator context
+    for (OperatorContextImpl opContext : contexts) {
+      suppressingClose(opContext);
+    }
+
+    suppressingClose(bufferManager);
+    suppressingClose(buffers);
+    suppressingClose(allocator);
+  }
+
+  private void suppressingClose(final AutoCloseable closeable) {
+    try {
+      if (closeable != null) {
+        closeable.close();
+      }
+    } catch (final Exception e) {
+      fail(e);
+    }
+  }
+
+  @Override
+  public PartitionExplorer getPartitionExplorer() {
+    throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " +
+        "in functions that can be evaluated at planning time. Make sure that the %s configuration " +
+        "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName()));
+  }
+
+  @Override
+  public ValueHolder getConstantValueHolder(String value, MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) {
+    if (!constantValueHolderCache.containsKey(value)) {
+      constantValueHolderCache.put(value, Maps.<MinorType, ValueHolder>newHashMap());
+    }
+
+    Map<MinorType, ValueHolder> holdersByType = constantValueHolderCache.get(value);
+    ValueHolder valueHolder = holdersByType.get(type);
+    if (valueHolder == null) {
+      valueHolder = holderInitializer.apply(getManagedBuffer());
+      holdersByType.put(type, valueHolder);
+    }
+    return valueHolder;
+  }
+
+  public ExecutorService getExecutor(){
+    return context.getExecutor();
+  }
+
+  @Override
+  public void waitForSendComplete() {
+    sendingAccountor.waitForSendComplete();
+  }
+
+  @Override
+  public WorkEventBus getWorkEventbus() {
+    return context.getWorkBus();
+  }
+
+  public boolean isBuffersDone() {
+    Preconditions.checkState(this.buffers != null, "Incoming Buffers is not set in this fragment context");
+    return buffers.isDone();
+  }
+
+  @Override
+  protected BufferManager getBufferManager() {
+    return bufferManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 3d2fdd8..e6ea5b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -60,7 +60,7 @@ public interface OperatorContext {
 
   BufferAllocator getAllocator();
 
-  FragmentContextInterface getFragmentContext();
+  FragmentContext getFragmentContext();
 
   DrillBuf replace(DrillBuf old, int newSize);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index e4c7dd9..550cea2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -43,11 +43,11 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
    */
   private ListeningExecutorService delegatePool;
 
-  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context) throws OutOfMemoryException {
     this(popConfig, context, null);
   }
 
-  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
+  public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context, OperatorStats stats)
       throws OutOfMemoryException {
     super(context,
           context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
@@ -74,12 +74,8 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable {
       return;
     }
     logger.debug("Closing context for {}", popConfig != null ? getName() : null);
-
-    try {
-      super.close();
-    } finally {
-      closed = true;
-    }
+    closed = true;
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index a38c3c2..e744f1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -35,8 +35,6 @@ import com.carrotsearch.hppc.procedures.IntLongProcedure;
 import com.google.common.annotations.VisibleForTesting;
 
 public class OperatorStats {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
-
   protected final int operatorId;
   protected final int operatorType;
   private final BufferAllocator allocator;
@@ -61,7 +59,6 @@ public class OperatorStats {
   private long setupMark;
   private long waitMark;
 
-//  private long schemas;
   private int inputCount;
 
   public OperatorStats(OpProfileDef def, BufferAllocator allocator){

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
new file mode 100644
index 0000000..a8dab2c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+public interface RootFragmentContext extends ExchangeFragmentContext {
+  FragmentStats getStats();
+
+  void setExecutorState(final ExecutorState executorState);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index 6752d76..04d29e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -36,7 +36,7 @@ public interface UdfUtilities {
 
   // Map between injectable classes and their respective getter methods
   // used for code generation
-  public static final ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS =
+  ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS =
       new ImmutableMap.Builder<Class<?>, String>()
           .put(DrillBuf.class, "getManagedBuffer")
           .put(PartitionExplorer.class, "getPartitionExplorer")

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index b4c8536..b93237e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.base;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.graph.GraphVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
@@ -83,12 +82,6 @@ public abstract class AbstractBase implements PhysicalOperator {
     return SelectionVectorMode.NONE;
   }
 
-  // Not available. Presumably because Drill does not currently use
-  // this value, though it does appear in some test physical plans.
-//  public void setInitialAllocation(long alloc) {
-//    initialAllocation = alloc;
-//  }
-
   @Override
   public long getInitialAllocation() {
     return initialAllocation;
@@ -116,7 +109,6 @@ public abstract class AbstractBase implements PhysicalOperator {
   @Override
   public void setMaxAllocation(long maxAllocation) {
     this.maxAllocation = maxAllocation;
-    /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
   }
 
   /**
@@ -126,9 +118,6 @@ public abstract class AbstractBase implements PhysicalOperator {
   @Override @JsonIgnore
   public boolean isBufferedOperator() { return false; }
 
-  // @Override
-  // public void setBufferedOperator(boolean bo) {}
-
   @Override
   public String getUserName() {
     return userName;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index d8014f5..35c371c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -35,7 +35,7 @@ public interface Exchange extends PhysicalOperator {
    * Assignment dependency describes whether sender fragments depend on receiver fragment's endpoint assignment for
    * determining its parallelization and endpoint assignment and vice versa.
    */
-  public enum ParallelizationDependency {
+  enum ParallelizationDependency {
     SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving fragment for parallelization
     RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending fragment for parallelization (default value).
   }
@@ -46,7 +46,7 @@ public interface Exchange extends PhysicalOperator {
    *
    * @param senderLocations
    */
-  public abstract void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException;
+  void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException;
 
   /**
    * Inform this Exchange node about its receiver locations. This list should be index-ordered the same as the expected
@@ -54,7 +54,7 @@ public interface Exchange extends PhysicalOperator {
    *
    * @param receiverLocations
    */
-  public abstract void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException;
+  void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException;
 
   /**
    * Get the Sender associated with the given minorFragmentId. Cannot be called until after setupSenders() and
@@ -66,7 +66,7 @@ public interface Exchange extends PhysicalOperator {
    *          The feeding node for the requested sender.
    * @return The materialized sender for the given arguments.
    */
-  public abstract Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException;
+  Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException;
 
   /**
    * Get the Receiver associated with the given minorFragmentId. Cannot be called until after setupSenders() and
@@ -76,7 +76,7 @@ public interface Exchange extends PhysicalOperator {
    *          The minor fragment id, must be in the range [0, fragment.width).
    * @return The materialized recevier for the given arguments.
    */
-  public abstract Receiver getReceiver(int minorFragmentId);
+  Receiver getReceiver(int minorFragmentId);
 
   /**
    * Provide parallelization parameters for sender side of the exchange. Output includes min width,
@@ -86,7 +86,7 @@ public interface Exchange extends PhysicalOperator {
    * @return
    */
   @JsonIgnore
-  public abstract ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints);
+  ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints);
 
   /**
    * Provide parallelization parameters for receiver side of the exchange. Output includes min width,
@@ -96,18 +96,18 @@ public interface Exchange extends PhysicalOperator {
    * @return
    */
   @JsonIgnore
-  public abstract ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints);
+  ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints);
 
   /**
    * Return the feeding child of this operator node.
    *
    * @return
    */
-  public PhysicalOperator getChild();
+  PhysicalOperator getChild();
 
   /**
    * Get the parallelization dependency of the Exchange.
    */
   @JsonIgnore
-  public ParallelizationDependency getParallelizationDependency();
+  ParallelizationDependency getParallelizationDependency();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 980f32c..7b76194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -47,7 +47,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    * @return
    */
   @JsonIgnore
-  public boolean isExecutable();
+  boolean isExecutable();
 
   /**
    * Describes the SelectionVector Mode for the output steam from this physical op.
@@ -55,7 +55,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    * @return
    */
   @JsonIgnore
-  public SelectionVectorMode getSVMode();
+  SelectionVectorMode getSVMode();
 
   /**
    * Provides capability to build a set of output based on traversing a query graph tree.
@@ -63,7 +63,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    * @param physicalVisitor
    * @return
    */
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
+  <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
 
   /**
    * Regenerate with this node with a new set of children.  This is used in the case of materialization or optimization.
@@ -71,44 +71,44 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    * @return
    */
   @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
+  PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
 
   /**
    * @return The memory to preallocate for this operator
    */
-  public long getInitialAllocation();
+  long getInitialAllocation();
 
   /**
    * @return The maximum memory this operator can allocate
    */
-  public long getMaxAllocation();
+  long getMaxAllocation();
 
   /**
    *
    * @param maxAllocation The max memory allocation to be set
    */
-  public void setMaxAllocation(long maxAllocation);
+  void setMaxAllocation(long maxAllocation);
 
   /**
    *
    * @return True iff this operator manages its memory (including disk spilling)
    */
   @JsonIgnore
-  public boolean isBufferedOperator();
+  boolean isBufferedOperator();
 
   // public void setBufferedOperator(boolean bo);
 
   @JsonProperty("@id")
-  public int getOperatorId();
+  int getOperatorId();
 
   @JsonProperty("@id")
-  public void setOperatorId(int id);
+  void setOperatorId(int id);
 
   @JsonProperty("cost")
-  public void setCost(double cost);
+  void setCost(double cost);
 
   @JsonProperty("cost")
-  public double getCost();
+  double getCost();
 
   /**
    * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
@@ -116,8 +116,8 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
    * @return
    */
   @JsonProperty("userName")
-  public String getUserName();
+  String getUserName();
 
   @JsonIgnore
-  public int getOperatorType();
+  int getOperatorType();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index d01e294..82887ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -21,11 +21,11 @@ import java.util.List;
 
 import org.apache.drill.common.DeferredException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.ops.OperatorUtilities;
+import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -39,10 +39,10 @@ public abstract class BaseRootExec implements RootExec {
 
   protected OperatorStats stats = null;
   protected OperatorContext oContext = null;
-  protected FragmentContext fragmentContext = null;
+  protected RootFragmentContext fragmentContext = null;
   private List<CloseableRecordBatch> operators;
 
-  public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = fragmentContext.newOperatorContext(config, stats);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -51,8 +51,8 @@ public abstract class BaseRootExec implements RootExec {
     this.fragmentContext = fragmentContext;
   }
 
-  public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext,
-      final PhysicalOperator config) throws OutOfMemoryException {
+  public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext,
+                      final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
         config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -87,7 +87,7 @@ public abstract class BaseRootExec implements RootExec {
   public final boolean next() {
     // Stats should have been initialized
     assert stats != null;
-    if (!fragmentContext.shouldContinue()) {
+    if (!fragmentContext.getExecutorState().shouldContinue()) {
       return false;
     }
     try {
@@ -156,7 +156,7 @@ public abstract class BaseRootExec implements RootExec {
       try {
         df.close();
       } catch (Exception e) {
-        fragmentContext.fail(e);
+        fragmentContext.getExecutorState().fail(e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index af99b5e..e9113ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,14 +20,12 @@ package org.apache.drill.exec.physical.impl;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface BatchCreator<T extends PhysicalOperator> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
-
-  public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children)
+  CloseableRecordBatch getBatch(ExecutorFragmentContext context, T config, List<RecordBatch> children)
       throws ExecutionSetupException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index b418fd4..2e0d14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -66,7 +66,7 @@ public class ImplCreator {
    * @return RootExec of fragment.
    * @throws ExecutionSetupException
    */
-  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+  public static RootExec getExec(ExecutorFragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     Preconditions.checkNotNull(root);
     Preconditions.checkNotNull(context);
 
@@ -99,14 +99,14 @@ public class ImplCreator {
       return rootExec;
     } catch(Exception e) {
       AutoCloseables.close(e, creator.getOperators());
-      context.fail(e);
+      context.getExecutorState().fail(e);
     }
     return null;
   }
 
   /** Create RootExec and its children (RecordBatches) for given FragmentRoot */
-  @SuppressWarnings("unchecked")
-  private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+
+  private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException {
     final List<RecordBatch> childRecordBatches = getChildren(root, context);
 
     if (context.isImpersonationEnabled()) {
@@ -131,7 +131,7 @@ public class ImplCreator {
 
   /** Create a RecordBatch and its children for given PhysicalOperator */
   @VisibleForTesting
-  public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+  public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
     Preconditions.checkNotNull(op);
 
     final List<RecordBatch> childRecordBatches = getChildren(op, context);
@@ -164,9 +164,9 @@ public class ImplCreator {
   }
 
   /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
-  private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+  private Object getOpCreator(PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
     final Class<? extends PhysicalOperator> opClass = op.getClass();
-    Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+    Object opCreator = context.getOperatorCreatorRegistry().getOperatorCreator(opClass);
     if (opCreator == null) {
       throw new UnsupportedOperationException(
           String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
@@ -176,7 +176,7 @@ public class ImplCreator {
   }
 
   /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
-  private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+  private List<RecordBatch> getChildren(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
     List<RecordBatch> children = Lists.newArrayList();
     for (PhysicalOperator child : op) {
       children.add(getRecordBatch(child, context));
@@ -184,5 +184,4 @@ public class ImplCreator {
 
     return children;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 690a662..0ef84b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -33,7 +33,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
 
   @SuppressWarnings("resource")
   @Override
-  public MergingRecordBatch getBatch(FragmentContext context,
+  public MergingRecordBatch getBatch(ExecutorFragmentContext context,
                               MergingReceiverPOP receiver,
                               List<RecordBatch> children)
       throws ExecutionSetupException, OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
index f3d9524..17e4027 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.physical.impl;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface RootCreator<T extends PhysicalOperator> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
-
-  public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+  RootExec getRoot(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index e0d1545..f6a4863 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,14 +82,13 @@ public class ScanBatch implements CloseableRecordBatch {
   private String currentReaderClassName;
   /**
    *
-   * @param subScanConfig
    * @param context
    * @param oContext
    * @param readerList
    * @param implicitColumnList : either an emptylist when all the readers do not have implicit
    *                        columns, or there is a one-to-one mapping between reader and implicitColumns.
    */
-  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+  public ScanBatch(FragmentContext context,
                    OperatorContext oContext, List<RecordReader> readerList,
                    List<Map<String, String>> implicitColumnList) {
     this.context = context;
@@ -126,8 +125,7 @@ public class ScanBatch implements CloseableRecordBatch {
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
                    List<RecordReader> readers)
       throws ExecutionSetupException {
-    this(subScanConfig, context,
-        context.newOperatorContext(subScanConfig),
+    this(context, context.newOperatorContext(subScanConfig),
         readers, Collections.<Map<String, String>> emptyList());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index d9abf40..46dc450 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingUserConnection;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -38,11 +39,10 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import com.google.common.base.Preconditions;
 
 public class ScreenCreator implements RootCreator<Screen> {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class);
 
   @Override
-  public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children)
+  public RootExec getRoot(ExecutorFragmentContext context, Screen config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkNotNull(children);
     Preconditions.checkArgument(children.size() == 1);
@@ -52,7 +52,7 @@ public class ScreenCreator implements RootCreator<Screen> {
   public static class ScreenRoot extends BaseRootExec {
     private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     private final RecordBatch incoming;
-    private final FragmentContext context;
+    private final RootFragmentContext context;
     private final AccountingUserConnection userConnection;
     private RecordMaterializer materializer;
 
@@ -67,13 +67,17 @@ public class ScreenCreator implements RootCreator<Screen> {
       }
     }
 
-    public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+    public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
       super(context, config);
       this.context = context;
       this.incoming = incoming;
       userConnection = context.getUserDataTunnel();
     }
 
+    public RootFragmentContext getContext() {
+      return context;
+    }
+
     @Override
     public boolean innerNext() {
       IterOutcome outcome = next(incoming);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2f33193..9231aef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -36,7 +37,7 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 public class SingleSenderCreator implements RootCreator<SingleSender>{
 
   @Override
-  public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
+  public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children)
       throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return new SingleSenderRootExec(context, children.iterator().next(), config);
@@ -64,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       }
     }
 
-    public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+    public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
       super(context, context.newOperatorContext(config, null), config);
       this.incoming = batch;
       assert incoming != null;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 7f9aca4..97e26b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -26,7 +26,6 @@ import javax.inject.Named;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 442a753..1683286 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -281,7 +281,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
       kill(false);
       logger.error("Failure during query", ex);
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return IterOutcome.STOP;
     }
   }
@@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
     SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     if (copier == null) {
-      copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
+      copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null);
     } else {
       for (VectorWrapper<?> i : batch) {
 
@@ -323,7 +323,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       selectionVector4.clear();
       c.clear();
       VectorContainer newQueue = new VectorContainer();
-      builder.build(context, newQueue);
+      builder.build(newQueue);
       priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
       builder.getSv4().clear();
       selectionVector4.clear();
@@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
     throws SchemaChangeException, ClassTransformationException, IOException {
     return createNewPriorityQueue(
-      mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(),
+      mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(),
       config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
   }
 
@@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
     final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
-    copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
+    copier = RemovingRecordBatch.getGenerated4Copier(batch, context,  newContainer, newBatch, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
     try {
@@ -434,7 +434,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       selectionVector4.clear();
       c.clear();
       final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
-      builder.build(context, oldSchemaContainer);
+      builder.build(oldSchemaContainer);
       oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
       final VectorContainer newSchemaContainer =  SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
       newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
index e815bff..d477744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.TopN;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class TopNSortBatchCreator implements BatchCreator<TopN>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class);
-
+public class TopNSortBatchCreator implements BatchCreator<TopN> {
   @Override
-  public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children)
+  public TopNBatch getBatch(ExecutorFragmentContext context, TopN config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new TopNBatch(config, context, children.iterator().next());
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 0ea17d6..e98a7c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -118,7 +118,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     } catch(IOException ex) {
       logger.error("Failure during query", ex);
       kill(false);
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       return IterOutcome.STOP;
     }
 
@@ -185,7 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     try {
       recordWriter.cleanup();
     } catch(IOException ex) {
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
     } finally {
       try {
         if (!processed) {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b3d68d3..47f1017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -186,7 +186,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       return aggregator.getOutcome();
 
     case UPDATE_AGGREGATOR:
-      context.fail(UserException.unsupportedError()
+      context.getExecutorState().fail(UserException.unsupportedError()
           .message(SchemaChangeException.schemaChanged(
               "Hash aggregate does not support schema change",
               incomingSchema,
@@ -212,7 +212,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       this.aggregator = createAggregatorInternal();
       return true;
     } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       container.clear();
       incoming.kill(false);
       return false;
@@ -227,9 +227,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     ClassGenerator<HashAggregator> cg = top.getRoot();
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
     top.plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    // top.saveCodeForDebugging(true);
-
     container.clear();
 
     int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
index 1397342..ed203e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class HashAggBatchCreator implements BatchCreator<HashAggregate>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class);
-
+public class HashAggBatchCreator implements BatchCreator<HashAggregate> {
   @Override
-  public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children)
+  public HashAggBatch getBatch(ExecutorFragmentContext context, HashAggregate config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new HashAggBatch(config, children.iterator().next(), context);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 89ba59b..ef8d9d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -300,7 +300,9 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
+                    RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
+                    TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 16b5499..3384e67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -35,34 +35,35 @@ import org.apache.drill.exec.record.VectorContainer;
 
 public interface HashAggregator {
 
-  public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
+  TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
       new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
 
-  public static enum AggOutcome {
+  enum AggOutcome {
     RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
   }
 
   // For returning results from outputCurrentBatch
   // OK - batch returned, NONE - end of data, RESTART - call again
-  public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+  enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
 
-  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+  void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
 
-  public abstract IterOutcome getOutcome();
+  IterOutcome getOutcome();
 
-  public abstract int getOutputCount();
+  int getOutputCount();
 
-  public abstract AggOutcome doWork();
+  AggOutcome doWork();
 
-  public abstract void cleanup();
+  void cleanup();
 
-  public abstract boolean allFlushed();
+  boolean allFlushed();
 
-  public abstract boolean buildComplete();
+  boolean buildComplete();
 
-  public abstract AggIterOutcome outputCurrentBatch();
+  AggIterOutcome outputCurrentBatch();
 
-  public abstract boolean earlyOutput();
+  boolean earlyOutput();
 
-  public abstract RecordBatch getNewIncoming();
+  RecordBatch getNewIncoming();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index ac4b29d..c473b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -45,19 +45,15 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   private int spilledBatches;
   private FragmentContext context;
   private BatchSchema schema;
-  private OperatorContext oContext;
   private SpillSet spillSet;
-  // Path spillStreamPath;
   private String spillFile;
   VectorAccessibleSerializable vas;
 
-  public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+  public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
     this.context = context;
     this.schema = schema;
     this.spilledBatches = spilledBatches;
-    this.oContext = oContext;
     this.spillSet = spillSet;
-    //this.spillStreamPath = spillStreamPath;
     this.spillFile = spillFile;
     vas = new VectorAccessibleSerializable(oContext.getAllocator());
     container = vas.get();
@@ -126,7 +122,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
   @Override
   public IterOutcome next() {
 
-    if ( ! context.shouldContinue() ) { return IterOutcome.STOP; }
+    if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
 
     if ( spilledBatches <= 0 ) { // no more batches to read in this partition
       this.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index b33dbd6..34ab97e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -202,7 +202,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       }
       return outcome;
     case UPDATE_AGGREGATOR:
-      context.fail(UserException.unsupportedError()
+      context.getExecutorState().fail(UserException.unsupportedError()
         .message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
         .build(logger));
       close();
@@ -263,7 +263,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       this.aggregator = createAggregatorInternal();
       return true;
     } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
-      context.fail(ex);
+      context.getExecutorState().fail(ex);
       container.clear();
       incoming.kill(false);
       return false;
@@ -275,8 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-    // cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
index cac5b06..864271e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
 
 import com.google.common.base.Preconditions;
 
-public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class);
-
+public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate> {
   @Override
-  public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children)
+  public StreamingAggBatch getBatch(ExecutorFragmentContext context, StreamingAggregate config, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
     return new StreamingAggBatch(config, children.iterator().next(), context);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 61c82d8..7e13eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
index 01122be..6c42a1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.RootCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -30,7 +30,7 @@ import com.google.common.collect.Iterators;
 
 public class BroadcastSenderCreator implements RootCreator<BroadcastSender> {
   @Override
-  public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
+  public RootExec getRoot(ExecutorFragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
     assert children != null && children.size() == 1;
     return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 80d7744..bd4a1ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,8 @@ import java.util.List;
 
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -57,7 +57,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     }
   }
 
-  public BroadcastSenderRootExec(FragmentContext context,
+  public BroadcastSenderRootExec(RootFragmentContext context,
                                  RecordBatch incoming,
                                  BroadcastSender config) throws OutOfMemoryException {
     super(context, context.newOperatorContext(config, null), config);

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 9bab67d..703868e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -225,12 +225,11 @@ public class ChainedHashTable {
     setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
 
     HashTable ht = context.getImplementationClass(top);
-    ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
+    ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
 
     return ht;
   }
 
-
   private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
       LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
       throws SchemaChangeException {

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 3749e3e..d28fe49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,64 +20,58 @@ package org.apache.drill.exec.physical.impl.common;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 
 public interface HashTable {
-
-  public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
-      new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class);
-
-  /**
-   * The initial default capacity of the hash table (in terms of number of buckets).
-   */
-  static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16;
+  TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<>(HashTable.class, HashTableTemplate.class);
 
   /**
    * The maximum capacity of the hash table (in terms of number of buckets).
    */
-  static final public int MAXIMUM_CAPACITY = 1 << 30;
+  int MAXIMUM_CAPACITY = 1 << 30;
 
   /**
    * The default load factor of a hash table.
    */
-  static final public float DEFAULT_LOAD_FACTOR = 0.75f;
+  float DEFAULT_LOAD_FACTOR = 0.75f;
 
-  static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
+  enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
 
   /**
    * The batch size used for internal batch holders
    */
-  static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
-  static final public int BATCH_MASK = 0x0000FFFF;
+  int BATCH_SIZE = Character.MAX_VALUE + 1;
+  int BATCH_MASK = 0x0000FFFF;
 
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
+  void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
+             VectorContainer htContainerOrig);
 
-  public void updateBatches() throws SchemaChangeException;
+  void updateBatches() throws SchemaChangeException;
 
-  public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
+  int getHashCode(int incomingRowIdx) throws SchemaChangeException;
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
+  PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
 
-  public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
+  int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
 
-  public void getStats(HashTableStats stats);
+  void getStats(HashTableStats stats);
 
-  public int size();
+  int size();
 
-  public boolean isEmpty();
+  boolean isEmpty();
 
-  public void clear();
+  void clear();
 
-  public void reinit(RecordBatch newIncoming);
+  void reinit(RecordBatch newIncoming);
 
-  public void reset();
+  void reset();
 
-  public void setMaxVarcharSize(int size);
+  void setMaxVarcharSize(int size);
 
-  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+  boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 6cbbdcb..272d782 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
@@ -48,7 +47,6 @@ public abstract class HashTableTemplate implements HashTable {
   private static final boolean EXTRA_DEBUG = false;
 
   private static final int EMPTY_SLOT = -1;
-  // private final int MISSING_VALUE = 65544;
 
   // A hash 'bucket' consists of the start index to indicate start of a hash chain
 
@@ -78,8 +76,6 @@ public abstract class HashTableTemplate implements HashTable {
   // Placeholder for the current index while probing the hash table
   private IndexPointer currentIdxHolder;
 
-//  private FragmentContext context;
-
   private BufferAllocator allocator;
 
   // The incoming build side record batch
@@ -451,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable {
 
 
   @Override
-  public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -470,7 +466,6 @@ public abstract class HashTableTemplate implements HashTable {
     }
 
     this.htConfig = htConfig;
-//    this.context = context;
     this.allocator = allocator;
     this.incomingBuild = incomingBuild;
     this.incomingProbe = incomingProbe;