You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:39 UTC
[02/24] git commit: wip to support op numbering throughout exec.
wip to support op numbering throughout exec.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ebf3d340
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ebf3d340
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ebf3d340
Branch: refs/heads/diagnostics2
Commit: ebf3d340497afeceb93d7e7c8211c5eebfce9ebf
Parents: e14a38c
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri May 16 10:52:15 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 09:11:22 2014 -0700
----------------------------------------------------------------------
.../exec/planner/fragment/Materializer.java | 27 +++++++-----
.../planner/physical/BroadcastExchangePrel.java | 2 +
.../drill/exec/planner/physical/FilterPrel.java | 2 +-
.../exec/planner/physical/HashAggPrel.java | 2 +
.../exec/planner/physical/HashJoinPrel.java | 1 +
.../physical/HashToMergeExchangePrel.java | 1 +
.../physical/HashToRandomExchangePrel.java | 2 +
.../drill/exec/planner/physical/LimitPrel.java | 1 +
.../exec/planner/physical/MergeJoinPrel.java | 1 +
.../planner/physical/PhysicalPlanCreator.java | 14 +++++-
.../exec/planner/physical/ProjectPrel.java | 1 +
.../drill/exec/planner/physical/ScanPrel.java | 1 +
.../drill/exec/planner/physical/ScreenPrel.java | 2 +
.../physical/SelectionVectorRemoverPrel.java | 5 ++-
.../physical/SingleMergeExchangePrel.java | 2 +
.../drill/exec/planner/physical/SortPrel.java | 1 +
.../exec/planner/physical/StreamAggPrel.java | 2 +-
.../drill/exec/planner/physical/TopNPrel.java | 1 +
.../planner/physical/UnionExchangePrel.java | 1 +
.../drill/exec/planner/physical/WriterPrel.java | 5 ++-
.../planner/physical/explain/PrelSequencer.java | 46 +++++++++++++++-----
.../planner/sql/handlers/DefaultSqlHandler.java | 3 +-
.../apache/drill/exec/TestOpSerialization.java | 10 +++--
23 files changed, 102 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 87078a2..ef9146a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -30,21 +30,23 @@ import com.google.common.collect.Lists;
public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class);
-
+
@Override
public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException {
iNode.addAllocation(exchange);
if(exchange == iNode.getNode().getSendingExchange()){
-
+
// this is a sending exchange.
PhysicalOperator child = exchange.getChild().accept(this, iNode);
PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child);
+ materializedSender.setOperatorId(0);
// logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
return materializedSender;
-
+
}else{
// receiving exchange.
PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
+ materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId());
// logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
return materializedReceiver;
}
@@ -52,7 +54,9 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException {
- return groupScan.getSpecificScan(iNode.getMinorFragmentId());
+ PhysicalOperator child = groupScan.getSpecificScan(iNode.getMinorFragmentId());
+ child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId());
+ return child;
}
@Override
@@ -67,9 +71,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
PhysicalOperator child = store.getChild().accept(this, iNode);
iNode.addAllocation(store);
-
+
try {
PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
+ o.setOperatorId(Short.MAX_VALUE & store.getOperatorId());
// logger.debug("New materialized store node {} with child {}", o, child);
return o;
} catch (PhysicalOperatorSetupException e) {
@@ -85,13 +90,15 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
for(PhysicalOperator child : op){
children.add(child.accept(this, iNode));
}
- return op.getNewWithChildren(children);
+ PhysicalOperator newOp = op.getNewWithChildren(children);
+ newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
+ return newOp;
}
-
+
public static class IndexedFragmentNode{
final Wrapper info;
final int minorFragmentId;
-
+
public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
super();
this.info = info;
@@ -113,7 +120,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
public void addAllocation(PhysicalOperator pop) {
info.addAllocation(pop);
}
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index e0f3ee1..8b1c720 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -89,6 +89,8 @@ public class BroadcastExchangePrel extends ExchangePrel{
}
BroadcastExchange g = new BroadcastExchange(childPOP);
+ g.setOperatorId(creator.getOperatorId(this));
+
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 8420e08..9632911 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -56,7 +56,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f);
-
+ p.setOperatorId(creator.getOperatorId(this));
return p;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..6377e35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -110,6 +110,8 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
exprs.toArray(new NamedExpression[exprs.size()]),
1.0f);
+ g.setOperatorId(creator.getOperatorId(this));
+
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 1a528d5..87da31e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -106,6 +106,7 @@ public class HashJoinPrel extends DrillJoinRelBase implements Prel {
}
HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
+ hjoin.setOperatorId(creator.getOperatorId(this));
return hjoin;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
index 262fd8c..0539a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
@@ -90,6 +90,7 @@ public class HashToMergeExchangePrel extends ExchangePrel {
HashToMergeExchange g = new HashToMergeExchange(childPOP,
PrelUtil.getHashExpression(this.distFields, getChild().getRowType()),
PrelUtil.getOrdering(this.collation, getChild().getRowType()));
+ g.setOperatorId(creator.getOperatorId(this));
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index ec9ed79..a5699cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -94,6 +94,8 @@ public class HashToRandomExchangePrel extends ExchangePrel {
if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType()));
+ g.setOperatorId(creator.getOperatorId(this));
+
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5986fde..794593a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -65,6 +65,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
Limit limit = new Limit(childPOP, first, last);
+ limit.setOperatorId(creator.getOperatorId(this));
return limit;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index fe03c40..400c6a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -110,6 +110,7 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel {
}
MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype);
+ mjoin.setOperatorId(creator.getOperatorId(this));
return mjoin;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
index 9ac07d3..f4189e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
@@ -27,20 +28,24 @@ import org.apache.drill.common.logical.PlanProperties.PlanType;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
import com.google.common.collect.Lists;
+import com.google.hive12.common.collect.Maps;
public class PhysicalPlanCreator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanCreator.class);
+ private final Map<Prel, OpId> opIdMap;
+
private List<PhysicalOperator> popList;
private final QueryContext context;
PhysicalPlan plan = null;
- public PhysicalPlanCreator(QueryContext context) {
+ public PhysicalPlanCreator(QueryContext context, Map<Prel, OpId> opIdMap) {
this.context = context;
+ this.opIdMap = opIdMap;
popList = Lists.newArrayList();
}
@@ -48,6 +53,11 @@ public class PhysicalPlanCreator {
return context;
}
+ public int getOperatorId(Prel prel){
+ OpId id = opIdMap.get(prel);
+ return id.getAsSingleInt();
+ }
+
public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) {
if (plan != null && !forceRebuild) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 1aa34d3..70dca25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -57,6 +57,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
Project p = new Project(this.getProjectExpressions(new DrillParseContext()), childPOP);
+ p.setOperatorId(creator.getOperatorId(this));
return p;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 74cd7a9..8461e24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -73,6 +73,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
throws IOException {
+ groupScan.setOperatorId(creator.getOperatorId(this));
return groupScan;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index 36bf796..d02ed44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -51,6 +51,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel {
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint());
+ s.setOperatorId(creator.getOperatorId(this));
+
return s;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index 63cdcaa..fd07749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -32,7 +32,10 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- return new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator));
+ SelectionVectorRemover r = new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator));
+ r.setOperatorId(creator.getOperatorId(this));
+ return r;
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
index 05d6e89..99d99a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
@@ -89,6 +89,8 @@ public class SingleMergeExchangePrel extends ExchangePrel {
if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
SingleMergeExchange g = new SingleMergeExchange(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()));
+ g.setOperatorId(creator.getOperatorId(this));
+
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index d582bc6..fa5e900 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -73,6 +73,7 @@ public class SortPrel extends SortRel implements Prel {
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false);
+ g.setOperatorId(creator.getOperatorId(this));
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..a95d926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -109,7 +109,7 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
Prel child = (Prel) this.getChild();
StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
-
+ g.setOperatorId(creator.getOperatorId(this));
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 3c8cfe0..0067926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -61,6 +61,7 @@ public class TopNPrel extends SinglePrel {
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false, this.limit);
+ topN.setOperatorId(creator.getOperatorId(this));
return topN;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
index 5d6b85d..f14df72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
@@ -81,6 +81,7 @@ public class UnionExchangePrel extends ExchangePrel {
if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
UnionExchange g = new UnionExchange(childPOP);
+ g.setOperatorId(creator.getOperatorId(this));
return g;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index 4cefeb5..e948125 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -43,7 +43,10 @@ public class WriterPrel extends DrillWriterRelBase implements Prel {
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getChild();
- return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+ PhysicalOperator g = getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+ g.setOperatorId(creator.getOperatorId(this));
+
+ return g;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
index 169deca..2ab6c74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
@@ -42,14 +42,17 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt
if (rel == null) {
return null;
}
- PrelSequencer s = new PrelSequencer();
final StringWriter sw = new StringWriter();
- final RelWriter planWriter = new NumberingRelWriter(s.go(rel), new PrintWriter(sw), explainlevel);
+ final RelWriter planWriter = new NumberingRelWriter(getIdMap(rel), new PrintWriter(sw), explainlevel);
rel.explain(planWriter);
return sw.toString();
}
+ public static Map<Prel, OpId> getIdMap(Prel rel){
+ PrelSequencer s = new PrelSequencer();
+ return s.go(rel);
+ }
static class Frag implements Iterable<Frag>{
@@ -110,7 +113,7 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt
}
- static class OpId{
+ public static class OpId{
int fragmentId;
int opId;
public OpId(int fragmentId, int opId) {
@@ -118,6 +121,21 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt
this.fragmentId = fragmentId;
this.opId = opId;
}
+
+
+ public int getFragmentId() {
+ return fragmentId;
+ }
+
+
+ public int getOpId() {
+ return opId;
+ }
+
+ public int getAsSingleInt(){
+ return (fragmentId << 16) + opId;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -172,19 +190,27 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt
}
// for each fragment, do a dfs of operators to assign operator ids.
- Map<Prel, OpId> ids = Maps.newHashMap();
+ Map<Prel, OpId> ids = Maps.newIdentityHashMap();
+
+ ids.put(rootFrag.root, new OpId(0, 0));
for(Frag f : frags){
- int id = 0;
+ int id = 1;
Queue<Prel> ops = Lists.newLinkedList();
ops.add(f.root);
while(!ops.isEmpty()){
Prel p = ops.remove();
- if(p instanceof ExchangePrel && p != f.root) continue;
- ids.put(p, new OpId(f.majorFragmentId, id++) );
+ boolean isExchange = p instanceof ExchangePrel;
+
+ if(p != f.root){ // we account for exchanges as receviers to guarantee unique identifiers.
+ ids.put(p, new OpId(f.majorFragmentId, id++) );
+ }
+
- List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator()));
- for(Prel child : children){
- ops.add(child);
+ if(!isExchange || p == f.root){
+ List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator()));
+ for(Prel child : children){
+ ops.add(child);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 1cb3cfb..b06432a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -132,8 +132,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
protected PhysicalOperator convertToPop(Prel prel) throws IOException {
-
- PhysicalPlanCreator creator = new PhysicalPlanCreator(context);
+ PhysicalPlanCreator creator = new PhysicalPlanCreator(context, PrelSequencer.getIdMap(prel));
PhysicalOperator op = prel.getPhysicalOperator(creator);
return op;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 3040de2..ad1d6b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -12,6 +12,7 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
@@ -27,10 +28,12 @@ public class TestOpSerialization {
DrillConfig c = DrillConfig.create();
PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
MockSubScanPOP s = new MockSubScanPOP("abc", null);
- s.setOperatorId(2);
+ s.setOperatorId(3);
Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f);
- f.setOperatorId(1);
- Screen screen = new Screen(f, CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ f.setOperatorId(2);
+ UnionExchange e = new UnionExchange(f);
+ e.setOperatorId(1);
+ Screen screen = new Screen(e, CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
screen.setOperatorId(0);
boolean reversed = false;
@@ -38,6 +41,7 @@ public class TestOpSerialization {
List<PhysicalOperator> pops = Lists.newArrayList();
pops.add(s);
+ pops.add(e);
pops.add(f);
pops.add(screen);