You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/14 06:42:08 UTC

[3/8] drill git commit: DRILL-3079: Move execution fragment json parsing from RPC message to fragment start.

DRILL-3079: Move execution fragment json parsing from RPC message to fragment start.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ad42611
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ad42611
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ad42611

Branch: refs/heads/master
Commit: 4ad426117512b1a6ec65ab15274ebc33db33b5a8
Parents: 814f553
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 13 12:55:01 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 13 19:33:55 2015 -0700

----------------------------------------------------------------------
 .../planner/fragment/SimpleParallelizer.java    |   55 +-
 .../exec/work/batch/AbstractDataCollector.java  |   38 +-
 .../exec/work/batch/ControlMessageHandler.java  |    5 +-
 .../drill/exec/work/batch/IncomingBuffers.java  |   52 +-
 .../drill/exec/work/batch/MergingCollector.java |    6 +-
 .../exec/work/batch/PartitionedCollector.java   |    6 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    9 +-
 .../exec/work/fragment/FragmentExecutor.java    |   36 +-
 .../work/fragment/NonRootFragmentManager.java   |   15 +-
 .../org/apache/drill/exec/proto/BitControl.java | 1531 +++++++++++++++---
 .../drill/exec/proto/SchemaBitControl.java      |  141 ++
 .../drill/exec/proto/beans/Collector.java       |  239 +++
 .../drill/exec/proto/beans/PlanFragment.java    |   34 +
 protocol/src/main/protobuf/BitControl.proto     |    8 +
 14 files changed, 1870 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index d36ad42..e04a8a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.fragment;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -26,22 +27,22 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Ordering;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -54,8 +55,12 @@ import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 
 /**
  * The simple parallelizer determines the level of parallelization of a plan based on the cost of the underlying
@@ -367,6 +372,7 @@ public class SimpleParallelizer {
             .setMinorFragmentId(minorFragmentId) //
             .setQueryId(queryId) //
             .build();
+
         PlanFragment fragment = PlanFragment.newBuilder() //
             .setForeman(foremanNode) //
             .setFragmentJson(plan) //
@@ -378,6 +384,7 @@ public class SimpleParallelizer {
             .setMemMax(wrapper.getMaxAllocation())
             .setOptionsJson(optionsData)
             .setCredentials(session.getCredentials())
+            .addAllCollector(CountRequiredFragments.getCollectors(root))
             .build();
 
         if (isRootNode) {
@@ -393,4 +400,44 @@ public class SimpleParallelizer {
 
     return new QueryWorkUnit(rootOperator, rootFragment, fragments);
   }
+
+  /**
+   * Designed to setup initial values for arriving fragment accounting.
+   */
+  private static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
+    private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();
+
+    public static List<Collector> getCollectors(PhysicalOperator root) {
+      List<Collector> collectors = Lists.newArrayList();
+      root.accept(INSTANCE, collectors);
+      return collectors;
+    }
+
+    @Override
+    public Void visitReceiver(Receiver receiver, List<Collector> collectors) throws RuntimeException {
+      List<MinorFragmentEndpoint> endpoints = receiver.getProvidingEndpoints();
+      List<Integer> list = new ArrayList<>(endpoints.size());
+      for (MinorFragmentEndpoint ep : endpoints) {
+        list.add(ep.getId());
+      }
+
+
+      collectors.add(Collector.newBuilder()
+        .setIsSpooling(receiver.isSpooling())
+        .setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
+        .setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange())
+          .addAllIncomingMinorFragment(list)
+          .build());
+      return null;
+    }
+
+    @Override
+    public Void visitOp(PhysicalOperator op, List<Collector> collectors) throws RuntimeException {
+      for (PhysicalOperator o : op) {
+        o.accept(this, collectors);
+      }
+      return null;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 6f16976..407c547 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -18,30 +18,25 @@
 package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.MinorFragmentEndpoint;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
 
 import com.google.common.base.Preconditions;
-import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
 
 public abstract class AbstractDataCollector implements DataCollector{
 
-  private final List<MinorFragmentEndpoint> incoming;
+  // private final List<MinorFragmentEndpoint> incoming;
   private final int oppositeMajorFragmentId;
   private final AtomicIntegerArray remainders;
   private final AtomicInteger remainingRequired;
   private final AtomicInteger parentAccounter;
-
+  private final int incomingStreams;
   protected final RawBatchBuffer[] buffers;
   protected final ArrayWrappedIntIntMap fragmentMap;
 
@@ -52,37 +47,36 @@ public abstract class AbstractDataCollector implements DataCollector{
    * @param bufferCapacity Capacity of each RawBatchBuffer.
    * @param context
    */
-  public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver,
-      final int numBuffers, final int bufferCapacity, FragmentContext context) {
-    Preconditions.checkNotNull(receiver);
+  public AbstractDataCollector(AtomicInteger parentAccounter,
+      final int numBuffers, Collector collector, final int bufferCapacity, FragmentContext context) {
+    Preconditions.checkNotNull(collector);
     Preconditions.checkNotNull(parentAccounter);
 
+    this.incomingStreams = collector.getIncomingMinorFragmentCount();
     this.parentAccounter = parentAccounter;
-    this.incoming = receiver.getProvidingEndpoints();
-    this.remainders = new AtomicIntegerArray(incoming.size());
-    this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
-
+    this.remainders = new AtomicIntegerArray(incomingStreams);
+    this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
     // Create fragmentId to index that is within the range [0, incoming.size()-1]
     // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
     fragmentMap = new ArrayWrappedIntIntMap();
     int index = 0;
-    for(MinorFragmentEndpoint endpoint : incoming) {
-      fragmentMap.put(endpoint.getId(), index);
+    for (Integer endpoint : collector.getIncomingMinorFragmentList()) {
+      fragmentMap.put(endpoint, index);
       index++;
     }
 
     buffers = new RawBatchBuffer[numBuffers];
     remainingRequired = new AtomicInteger(numBuffers);
 
-    final boolean spooling = receiver.isSpooling();
+    final boolean spooling = collector.getIsSpooling();
 
     try {
 
       for (int i = 0; i < numBuffers; i++) {
         if (spooling) {
-          buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId(), i);
+          buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i);
         } else {
-          buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId());
+          buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId());
         }
       }
     } catch (IOException | OutOfMemoryException e) {
@@ -129,7 +123,7 @@ public abstract class AbstractDataCollector implements DataCollector{
 
   @Override
   public int getTotalIncomingFragments() {
-    return incoming.size();
+    return incomingStreams;
   }
 
   protected abstract RawBatchBuffer getBuffer(int minorFragmentId);

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 421ad7f..8ee7d38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -21,7 +21,6 @@ import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -132,9 +131,7 @@ public class ControlMessageHandler {
             drillbitContext.getFunctionImplementationRegistry());
         final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
         final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
-        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
-            fragment.getFragmentJson());
-        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+        final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener);
         bee.addFragmentRunner(fr);
       } else {
         // isIntermediate, store for incoming data.

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index b0206f7..1c8b066 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -24,9 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,18 +38,24 @@ public class IncomingBuffers implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
 
   private final AtomicInteger streamsRemaining = new AtomicInteger(0);
-  private final AtomicInteger remainingRequired = new AtomicInteger(0);
+  private final AtomicInteger remainingRequired;
   private final Map<Integer, DataCollector> fragCounts;
   private final FragmentContext context;
 
-  public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
+  public IncomingBuffers(PlanFragment fragment, FragmentContext context) {
     this.context = context;
-    Map<Integer, DataCollector> counts = Maps.newHashMap();
-    CountRequiredFragments reqFrags = new CountRequiredFragments();
-    root.accept(reqFrags, counts);
+    Map<Integer, DataCollector> collectors = Maps.newHashMap();
+    remainingRequired = new AtomicInteger(fragment.getCollectorCount());
+    for(int i =0; i < fragment.getCollectorCount(); i++){
+      Collector collector = fragment.getCollector(i);
+      DataCollector newCollector = collector.getSupportsOutOfOrder() ?
+          new MergingCollector(remainingRequired, collector, context) :
+          new PartitionedCollector(remainingRequired, collector, context);
+      collectors.put(collector.getOppositeMajorFragmentId(), newCollector);
+    }
 
-    logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(), counts);
-    fragCounts = ImmutableMap.copyOf(counts);
+    logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(), collectors);
+    fragCounts = ImmutableMap.copyOf(collectors);
 
     // Determine the total number of incoming streams that will need to be completed before we are finished.
     int totalStreams = 0;
@@ -98,34 +103,7 @@ public class IncomingBuffers implements AutoCloseable {
   }
 
 
-  /**
-   * Designed to setup initial values for arriving fragment accounting.
-   */
-  public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, DataCollector>, RuntimeException> {
-
-    @Override
-    public Void visitReceiver(Receiver receiver, Map<Integer, DataCollector> counts) throws RuntimeException {
-      DataCollector set;
-      if (receiver.supportsOutOfOrderExchange()) {
-        set = new MergingCollector(remainingRequired, receiver, context);
-      } else {
-        set = new PartitionedCollector(remainingRequired, receiver, context);
-      }
-
-      counts.put(set.getOppositeMajorFragmentId(), set);
-      remainingRequired.incrementAndGet();
-      return null;
-    }
 
-    @Override
-    public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> value) throws RuntimeException {
-      for (PhysicalOperator o : op) {
-        o.accept(this, value);
-      }
-      return null;
-    }
-
-  }
 
   public boolean isDone() {
     return streamsRemaining.get() < 1;

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 8c09f80..5d13a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.work.batch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
 
 public class MergingCollector extends AbstractDataCollector{
 
-  public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
-    super(parentAccounter, receiver, 1, receiver.getProvidingEndpoints().size(), context);
+  public MergingCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
+    super(parentAccounter, 1, collector, collector.getIncomingMinorFragmentCount(), context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 7ce9074..a334d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.work.batch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
 
 public class PartitionedCollector extends AbstractDataCollector{
 
-  public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
-    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), 1, context);
+  public PartitionedCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
+    super(parentAccounter, collector.getIncomingMinorFragmentCount(), collector, 1, context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index cdf1276..fe267ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -929,15 +929,14 @@ public class Foreman implements Runnable {
     final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
         initiatingClient, drillbitContext.getFunctionImplementationRegistry());
     @SuppressWarnings("resource")
-    final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+    final IncomingBuffers buffers = new IncomingBuffers(rootFragment, rootContext);
     rootContext.setBuffers(buffers);
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    rootRunner = new FragmentExecutor(rootContext, rootOperator,
-        queryManager.newRootStatusHandler(rootContext));
-    final RootFragmentManager fragmentManager =
-        new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+    rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+        rootOperator);
+    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
     if (buffers.isDone()) {
       // if we don't have to wait for any incoming data, start the fragment runner.

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 6b44ae3..8c49d68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -53,10 +54,11 @@ public class FragmentExecutor implements Runnable {
   private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
 
   private final String fragmentName;
-  private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
   private final DeferredException deferredException = new DeferredException();
+  private final PlanFragment fragment;
+  private final FragmentRoot rootOperator;
 
   private volatile RootExec root;
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
@@ -65,11 +67,33 @@ public class FragmentExecutor implements Runnable {
   // Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
   private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
 
-  public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
-                          final StatusReporter listener) {
+  /**
+   * Create a FragmentExecutor where we need to parse and materialize the root operator.
+   *
+   * @param context
+   * @param fragment
+   * @param listener
+   * @param rootOperator
+   */
+  public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
+      final StatusReporter listener) {
+    this(context, fragment, listener, null);
+  }
+
+  /**
+   * Create a FragmentExecutor where we already have a root operator in memory.
+   *
+   * @param context
+   * @param fragment
+   * @param listener
+   * @param rootOperator
+   */
+  public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
+      final StatusReporter listener, final FragmentRoot rootOperator) {
     this.fragmentContext = context;
-    this.rootOperator = rootOperator;
     this.listener = listener;
+    this.fragment = fragment;
+    this.rootOperator = rootOperator;
     this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
 
     context.setExecutorState(new ExecutorStateImpl());
@@ -197,6 +221,10 @@ public class FragmentExecutor implements Runnable {
          * fragmentState might have changed even before this method is called e.g. cancel()
          */
         if (shouldContinue()) {
+          // if we didn't get the root operator when the executor was created, create it now.
+          final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+              drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+
           root = ImplCreator.getExec(fragmentContext, rootOperator);
 
           clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);

http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index ca5d5b8..77440c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -42,10 +41,9 @@ import com.google.common.base.Preconditions;
 public class NonRootFragmentManager implements FragmentManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
 
-  private final PlanFragment fragment;
-  private FragmentRoot root;
   private final IncomingBuffers buffers;
   private final FragmentExecutor runner;
+  private final FragmentHandle handle;
   private volatile boolean cancel = false;
   private final FragmentContext context;
   private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
@@ -54,16 +52,15 @@ public class NonRootFragmentManager implements FragmentManager {
   public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
       throws ExecutionSetupException {
     try {
-      this.fragment = fragment;
-      this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+      this.handle = fragment.getHandle();
       this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
-      this.buffers = new IncomingBuffers(root, this.context);
+      this.buffers = new IncomingBuffers(fragment, this.context);
       final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
           fragment.getForeman()));
-      this.runner = new FragmentExecutor(this.context, root, reporter);
+      this.runner = new FragmentExecutor(this.context, fragment, reporter);
       this.context.setBuffers(buffers);
 
-    } catch (ForemanException | IOException e) {
+    } catch (ForemanException e) {
       throw new FragmentSetupException("Failure while decoding fragment.", e);
     }
   }
@@ -118,7 +115,7 @@ public class NonRootFragmentManager implements FragmentManager {
 
   @Override
   public FragmentHandle getHandle() {
-    return fragment.getHandle();
+    return handle;
   }
 
   @Override