You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/14 06:42:08 UTC
[3/8] drill git commit: DRILL-3079: Move execution fragment json
parsing from RPC message to fragment start.
DRILL-3079: Move execution fragment json parsing from RPC message to fragment start.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ad42611
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ad42611
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ad42611
Branch: refs/heads/master
Commit: 4ad426117512b1a6ec65ab15274ebc33db33b5a8
Parents: 814f553
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 13 12:55:01 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 13 19:33:55 2015 -0700
----------------------------------------------------------------------
.../planner/fragment/SimpleParallelizer.java | 55 +-
.../exec/work/batch/AbstractDataCollector.java | 38 +-
.../exec/work/batch/ControlMessageHandler.java | 5 +-
.../drill/exec/work/batch/IncomingBuffers.java | 52 +-
.../drill/exec/work/batch/MergingCollector.java | 6 +-
.../exec/work/batch/PartitionedCollector.java | 6 +-
.../apache/drill/exec/work/foreman/Foreman.java | 9 +-
.../exec/work/fragment/FragmentExecutor.java | 36 +-
.../work/fragment/NonRootFragmentManager.java | 15 +-
.../org/apache/drill/exec/proto/BitControl.java | 1531 +++++++++++++++---
.../drill/exec/proto/SchemaBitControl.java | 141 ++
.../drill/exec/proto/beans/Collector.java | 239 +++
.../drill/exec/proto/beans/PlanFragment.java | 34 +
protocol/src/main/protobuf/BitControl.proto | 8 +
14 files changed, 1870 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index d36ad42..e04a8a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.fragment;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -26,22 +27,22 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Ordering;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -54,8 +55,12 @@ import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
/**
* The simple parallelizer determines the level of parallelization of a plan based on the cost of the underlying
@@ -367,6 +372,7 @@ public class SimpleParallelizer {
.setMinorFragmentId(minorFragmentId) //
.setQueryId(queryId) //
.build();
+
PlanFragment fragment = PlanFragment.newBuilder() //
.setForeman(foremanNode) //
.setFragmentJson(plan) //
@@ -378,6 +384,7 @@ public class SimpleParallelizer {
.setMemMax(wrapper.getMaxAllocation())
.setOptionsJson(optionsData)
.setCredentials(session.getCredentials())
+ .addAllCollector(CountRequiredFragments.getCollectors(root))
.build();
if (isRootNode) {
@@ -393,4 +400,44 @@ public class SimpleParallelizer {
return new QueryWorkUnit(rootOperator, rootFragment, fragments);
}
+
+ /**
+ * Designed to setup initial values for arriving fragment accounting.
+ */
+ private static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
+ private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();
+
+ public static List<Collector> getCollectors(PhysicalOperator root) {
+ List<Collector> collectors = Lists.newArrayList();
+ root.accept(INSTANCE, collectors);
+ return collectors;
+ }
+
+ @Override
+ public Void visitReceiver(Receiver receiver, List<Collector> collectors) throws RuntimeException {
+ List<MinorFragmentEndpoint> endpoints = receiver.getProvidingEndpoints();
+ List<Integer> list = new ArrayList<>(endpoints.size());
+ for (MinorFragmentEndpoint ep : endpoints) {
+ list.add(ep.getId());
+ }
+
+
+ collectors.add(Collector.newBuilder()
+ .setIsSpooling(receiver.isSpooling())
+ .setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
+ .setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange())
+ .addAllIncomingMinorFragment(list)
+ .build());
+ return null;
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, List<Collector> collectors) throws RuntimeException {
+ for (PhysicalOperator o : op) {
+ o.accept(this, collectors);
+ }
+ return null;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 6f16976..407c547 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -18,30 +18,25 @@
package org.apache.drill.exec.work.batch;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.MinorFragmentEndpoint;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
import com.google.common.base.Preconditions;
-import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
public abstract class AbstractDataCollector implements DataCollector{
- private final List<MinorFragmentEndpoint> incoming;
+ // private final List<MinorFragmentEndpoint> incoming;
private final int oppositeMajorFragmentId;
private final AtomicIntegerArray remainders;
private final AtomicInteger remainingRequired;
private final AtomicInteger parentAccounter;
-
+ private final int incomingStreams;
protected final RawBatchBuffer[] buffers;
protected final ArrayWrappedIntIntMap fragmentMap;
@@ -52,37 +47,36 @@ public abstract class AbstractDataCollector implements DataCollector{
* @param bufferCapacity Capacity of each RawBatchBuffer.
* @param context
*/
- public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver,
- final int numBuffers, final int bufferCapacity, FragmentContext context) {
- Preconditions.checkNotNull(receiver);
+ public AbstractDataCollector(AtomicInteger parentAccounter,
+ final int numBuffers, Collector collector, final int bufferCapacity, FragmentContext context) {
+ Preconditions.checkNotNull(collector);
Preconditions.checkNotNull(parentAccounter);
+ this.incomingStreams = collector.getIncomingMinorFragmentCount();
this.parentAccounter = parentAccounter;
- this.incoming = receiver.getProvidingEndpoints();
- this.remainders = new AtomicIntegerArray(incoming.size());
- this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
-
+ this.remainders = new AtomicIntegerArray(incomingStreams);
+ this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
// Create fragmentId to index that is within the range [0, incoming.size()-1]
// We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
fragmentMap = new ArrayWrappedIntIntMap();
int index = 0;
- for(MinorFragmentEndpoint endpoint : incoming) {
- fragmentMap.put(endpoint.getId(), index);
+ for (Integer endpoint : collector.getIncomingMinorFragmentList()) {
+ fragmentMap.put(endpoint, index);
index++;
}
buffers = new RawBatchBuffer[numBuffers];
remainingRequired = new AtomicInteger(numBuffers);
- final boolean spooling = receiver.isSpooling();
+ final boolean spooling = collector.getIsSpooling();
try {
for (int i = 0; i < numBuffers; i++) {
if (spooling) {
- buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId(), i);
+ buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i);
} else {
- buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId());
+ buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId());
}
}
} catch (IOException | OutOfMemoryException e) {
@@ -129,7 +123,7 @@ public abstract class AbstractDataCollector implements DataCollector{
@Override
public int getTotalIncomingFragments() {
- return incoming.size();
+ return incomingStreams;
}
protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 421ad7f..8ee7d38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -21,7 +21,6 @@ import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -132,9 +131,7 @@ public class ControlMessageHandler {
drillbitContext.getFunctionImplementationRegistry());
final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
- fragment.getFragmentJson());
- final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+ final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener);
bee.addFragmentRunner(fr);
} else {
// isIntermediate, store for incoming data.
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index b0206f7..1c8b066 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -24,9 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.record.RawFragmentBatch;
import com.google.common.collect.ImmutableMap;
@@ -39,18 +38,24 @@ public class IncomingBuffers implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
private final AtomicInteger streamsRemaining = new AtomicInteger(0);
- private final AtomicInteger remainingRequired = new AtomicInteger(0);
+ private final AtomicInteger remainingRequired;
private final Map<Integer, DataCollector> fragCounts;
private final FragmentContext context;
- public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
+ public IncomingBuffers(PlanFragment fragment, FragmentContext context) {
this.context = context;
- Map<Integer, DataCollector> counts = Maps.newHashMap();
- CountRequiredFragments reqFrags = new CountRequiredFragments();
- root.accept(reqFrags, counts);
+ Map<Integer, DataCollector> collectors = Maps.newHashMap();
+ remainingRequired = new AtomicInteger(fragment.getCollectorCount());
+ for(int i =0; i < fragment.getCollectorCount(); i++){
+ Collector collector = fragment.getCollector(i);
+ DataCollector newCollector = collector.getSupportsOutOfOrder() ?
+ new MergingCollector(remainingRequired, collector, context) :
+ new PartitionedCollector(remainingRequired, collector, context);
+ collectors.put(collector.getOppositeMajorFragmentId(), newCollector);
+ }
- logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), counts);
- fragCounts = ImmutableMap.copyOf(counts);
+ logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), collectors);
+ fragCounts = ImmutableMap.copyOf(collectors);
// Determine the total number of incoming streams that will need to be completed before we are finished.
int totalStreams = 0;
@@ -98,34 +103,7 @@ public class IncomingBuffers implements AutoCloseable {
}
- /**
- * Designed to setup initial values for arriving fragment accounting.
- */
- public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, DataCollector>, RuntimeException> {
-
- @Override
- public Void visitReceiver(Receiver receiver, Map<Integer, DataCollector> counts) throws RuntimeException {
- DataCollector set;
- if (receiver.supportsOutOfOrderExchange()) {
- set = new MergingCollector(remainingRequired, receiver, context);
- } else {
- set = new PartitionedCollector(remainingRequired, receiver, context);
- }
-
- counts.put(set.getOppositeMajorFragmentId(), set);
- remainingRequired.incrementAndGet();
- return null;
- }
- @Override
- public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> value) throws RuntimeException {
- for (PhysicalOperator o : op) {
- o.accept(this, value);
- }
- return null;
- }
-
- }
public boolean isDone() {
return streamsRemaining.get() < 1;
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 8c09f80..5d13a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
public class MergingCollector extends AbstractDataCollector{
- public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
- super(parentAccounter, receiver, 1, receiver.getProvidingEndpoints().size(), context);
+ public MergingCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
+ super(parentAccounter, 1, collector, collector.getIncomingMinorFragmentCount(), context);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 7ce9074..a334d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.BitControl.Collector;
public class PartitionedCollector extends AbstractDataCollector{
- public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
- super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), 1, context);
+ public PartitionedCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
+ super(parentAccounter, collector.getIncomingMinorFragmentCount(), collector, 1, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index cdf1276..fe267ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -929,15 +929,14 @@ public class Foreman implements Runnable {
final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
initiatingClient, drillbitContext.getFunctionImplementationRegistry());
@SuppressWarnings("resource")
- final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+ final IncomingBuffers buffers = new IncomingBuffers(rootFragment, rootContext);
rootContext.setBuffers(buffers);
queryManager.addFragmentStatusTracker(rootFragment, true);
- rootRunner = new FragmentExecutor(rootContext, rootOperator,
- queryManager.newRootStatusHandler(rootContext));
- final RootFragmentManager fragmentManager =
- new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+ rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
+ rootOperator);
+ final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
if (buffers.isDone()) {
// if we don't have to wait for any incoming data, start the fragment runner.
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 6b44ae3..8c49d68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -53,10 +54,11 @@ public class FragmentExecutor implements Runnable {
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
private final String fragmentName;
- private final FragmentRoot rootOperator;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
private final DeferredException deferredException = new DeferredException();
+ private final PlanFragment fragment;
+ private final FragmentRoot rootOperator;
private volatile RootExec root;
private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
@@ -65,11 +67,33 @@ public class FragmentExecutor implements Runnable {
// Thread that is currently executing the Fragment. Value is null if the fragment hasn't started running or finished
private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
- public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
- final StatusReporter listener) {
+ /**
+ * Create a FragmentExecutor where we need to parse and materialize the root operator.
+ *
+ * @param context
+ * @param fragment
+ * @param listener
+ * @param rootOperator
+ */
+ public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
+ final StatusReporter listener) {
+ this(context, fragment, listener, null);
+ }
+
+ /**
+ * Create a FragmentExecutor where we already have a root operator in memory.
+ *
+ * @param context
+ * @param fragment
+ * @param listener
+ * @param rootOperator
+ */
+ public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
+ final StatusReporter listener, final FragmentRoot rootOperator) {
this.fragmentContext = context;
- this.rootOperator = rootOperator;
this.listener = listener;
+ this.fragment = fragment;
+ this.rootOperator = rootOperator;
this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
context.setExecutorState(new ExecutorStateImpl());
@@ -197,6 +221,10 @@ public class FragmentExecutor implements Runnable {
* fragmentState might have changed even before this method is called e.g. cancel()
*/
if (shouldContinue()) {
+ // if we didn't get the root operator when the executor was created, create it now.
+ final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+ drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+
root = ImplCreator.getExec(fragmentContext, rootOperator);
clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
http://git-wip-us.apache.org/repos/asf/drill/blob/4ad42611/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index ca5d5b8..77440c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -42,10 +41,9 @@ import com.google.common.base.Preconditions;
public class NonRootFragmentManager implements FragmentManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
- private final PlanFragment fragment;
- private FragmentRoot root;
private final IncomingBuffers buffers;
private final FragmentExecutor runner;
+ private final FragmentHandle handle;
private volatile boolean cancel = false;
private final FragmentContext context;
private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
@@ -54,16 +52,15 @@ public class NonRootFragmentManager implements FragmentManager {
public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
throws ExecutionSetupException {
try {
- this.fragment = fragment;
- this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ this.handle = fragment.getHandle();
this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
- this.buffers = new IncomingBuffers(root, this.context);
+ this.buffers = new IncomingBuffers(fragment, this.context);
final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
fragment.getForeman()));
- this.runner = new FragmentExecutor(this.context, root, reporter);
+ this.runner = new FragmentExecutor(this.context, fragment, reporter);
this.context.setBuffers(buffers);
- } catch (ForemanException | IOException e) {
+ } catch (ForemanException e) {
throw new FragmentSetupException("Failure while decoding fragment.", e);
}
}
@@ -118,7 +115,7 @@ public class NonRootFragmentManager implements FragmentManager {
@Override
public FragmentHandle getHandle() {
- return fragment.getHandle();
+ return handle;
}
@Override