You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:44 UTC
[47/50] [abbrv] git commit: DAG-execplan
DAG-execplan
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bc50ff78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bc50ff78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bc50ff78
Branch: refs/heads/DAG-execplan
Commit: bc50ff789c6b5c2b17fa73a02a40a3717121eede
Parents: 4537700
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Dec 27 19:30:56 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Dec 27 19:30:56 2013 +0900
----------------------------------------------------------------------
.../engine/planner/PhysicalPlannerImpl.java | 9 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 15 +-
.../tajo/engine/planner/global/DataChannel.java | 13 +-
.../engine/planner/global/ExecutionPlan.java | 110 ++++-----
.../engine/planner/global/GlobalPlanner.java | 243 ++++++++-----------
.../tajo/engine/planner/global/MasterPlan.java | 3 +-
.../planner/graph/SimpleDirectedGraph.java | 5 +-
.../tajo/master/querymaster/SubQuery.java | 3 +-
.../planner/physical/TestPhysicalPlanner.java | 6 +-
9 files changed, 178 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 19a6574..d1f34c8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -128,7 +128,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
LogicalNode root = plan.getTerminalNode();
List<DataChannel> channels = context.getOutgoingChannels();
for (DataChannel channel : channels) {
- LogicalNode node = plan.getTopNodeFromPID(channel.getSrcPID());
+ LogicalNode node = plan.getPlanGroupWithPID(channel.getSrcPID()).getRootNode();
if (node.getType() != NodeType.STORE) {
StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
@@ -140,9 +140,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
storeTableNode.setDefaultParition();
}
- plan.remove(node, root);
- plan.add(node, storeTableNode, EdgeType.SINGLE);
- plan.add(storeTableNode, root, EdgeType.SINGLE);
+ LogicalNode topNode = plan.getFirstPlanGroup().toLinkedLogicalNode();
+ storeTableNode.setChild(topNode);
+ plan.setPlan(storeTableNode);
+
channel.updateSrcPID(storeTableNode.getPID());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index e9bb071..5164b65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -492,13 +492,18 @@ public class PlannerUtil {
Preconditions.checkNotNull(executionPlan);
Preconditions.checkNotNull(type);
- LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
- finder.find();
-
- if (finder.getFoundNodes().size() == 0) {
+ if (executionPlan.hasPlanGroup()) {
+ return findTopNode(executionPlan.getFirstPlanGroup().toLinkedLogicalNode(), type);
+ } else {
return null;
}
- return (T) finder.getFoundNodes().get(0);
+// LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
+// finder.find();
+//
+// if (finder.getFoundNodes().size() == 0) {
+// return null;
+// }
+// return (T) finder.getFoundNodes().get(0);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index a8eb1c3..98ee8c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.planner.global.ExecutionPlan.LogicalNodeGroup;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.PlanGroup;
import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -49,9 +49,8 @@ public class DataChannel {
this.partitionType = partitionType;
}
- public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum, Schema schema) {
+ public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
this(src.getId(), target.getId(), partitionType, partNum);
- setSchema(schema);
}
public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
@@ -235,10 +234,12 @@ public class DataChannel {
return targetId.getPid();
}
- public static DataChannel linkChannelAndLogicalNodeGroups(LogicalNodeGroup src, LogicalNodeGroup target,
- DataChannel channel) {
+ public static DataChannel linkChannelAndPlanGroups(PlanGroup src, PlanGroup target,
+ DataChannel channel) {
channel.srcId.setPID(src.getId());
- channel.targetId.setPID(target.getId());
+ if (target != null) {
+ channel.targetId.setPID(target.getId());
+ }
return channel;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index 614d128..a3506d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -45,18 +45,18 @@ public class ExecutionPlan implements GsonObject {
@Expose private boolean hasUnionPlan;
@Expose private boolean hasJoinPlan;
@Expose private LogicalRootNode terminalNode;
- @Expose private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
- @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
+ private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+ private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
= new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
- private NavigableMap<Integer, LogicalNodeGroup> logicalNodeGroups = Maps.newTreeMap();
+ @Expose private NavigableMap<Integer, PlanGroup> planGroups = Maps.newTreeMap();
private boolean built = false;
- public static class LogicalNodeGroup {
- private int rootPID;
- private List<LogicalNode> nodes = Lists.newArrayList(); // order: root -> leaf
+ public static class PlanGroup {
+ @Expose private int rootPID;
+ @Expose private List<LogicalNode> nodes = Lists.newArrayList(); // order: root -> leaf
- public LogicalNodeGroup(int rootPID) {
+ public PlanGroup(int rootPID) {
setId(rootPID);
}
@@ -71,12 +71,12 @@ public class ExecutionPlan implements GsonObject {
public void addNodeAndDescendants(LogicalNode logicalNode) {
add(logicalNode);
if (logicalNode instanceof UnaryNode) {
- add(((UnaryNode) logicalNode).getChild());
+ addNodeAndDescendants(((UnaryNode) logicalNode).getChild());
} else if (logicalNode instanceof BinaryNode) {
- add(((BinaryNode) logicalNode).getLeftChild());
- add(((BinaryNode) logicalNode).getRightChild());
+ addNodeAndDescendants(((BinaryNode) logicalNode).getLeftChild());
+ addNodeAndDescendants(((BinaryNode) logicalNode).getRightChild());
} else if (logicalNode instanceof TableSubQueryNode) {
- add(((TableSubQueryNode) logicalNode).getSubQuery());
+ addNodeAndDescendants(((TableSubQueryNode) logicalNode).getSubQuery());
}
}
@@ -87,7 +87,7 @@ public class ExecutionPlan implements GsonObject {
public LogicalNode toLinkedLogicalNode() {
LogicalNode[] nodes = this.nodes.toArray(new LogicalNode[this.nodes.size()]);
- for (int i = 0; i < nodes.length; i++) {
+ for (int i = 0; i < nodes.length;) {
if (nodes[i] instanceof UnaryNode) {
((UnaryNode)nodes[i]).setChild(nodes[++i]);
} else if (nodes[i] instanceof BinaryNode) {
@@ -95,6 +95,8 @@ public class ExecutionPlan implements GsonObject {
((BinaryNode)nodes[i]).setRightChild(nodes[++i]);
} else if (nodes[i] instanceof TableSubQueryNode) {
((TableSubQueryNode)nodes[i]).setSubQuery(nodes[++i]);
+ } else {
+ i++;
}
}
return nodes[0];
@@ -131,10 +133,7 @@ public class ExecutionPlan implements GsonObject {
for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
graph.removeEdge(edge.getChildId(), edge.getParentId());
}
- for (LogicalNodeGroup eachGroup : logicalNodeGroups.values()) {
- eachGroup.clear();
- }
- logicalNodeGroups.clear();
+ planGroups.clear();
vertices.clear();
this.inputContext = null;
this.hasUnionPlan = false;
@@ -152,17 +151,33 @@ public class ExecutionPlan implements GsonObject {
}
// add group
- LogicalNodeGroup nodeGroup = new LogicalNodeGroup(topNode.getPID());
+ PlanGroup nodeGroup = new PlanGroup(topNode.getPID());
nodeGroup.addNodeAndDescendants(topNode);
- logicalNodeGroups.put(nodeGroup.rootPID, nodeGroup);
+ planGroups.put(nodeGroup.rootPID, nodeGroup);
+ }
+
+ public PlanGroup remoteLogicalNodeGroup(int pid) {
+ // TODO: improve the code
+ PlanGroup removed = planGroups.remove(pid);
+ Collection<PlanGroup> remain = planGroups.values();
+ clear();
+ for (PlanGroup planGroup : remain) {
+ addPlan(planGroup.toLinkedLogicalNode());
+ }
+ build();
+ return removed;
+ }
+
+ public PlanGroup getPlanGroupWithPID(int pid) {
+ return planGroups.get(pid);
}
- public LogicalNodeGroup getLogicalNodeGroupWithPID(int pid) {
- return logicalNodeGroups.get(pid);
+ public PlanGroup getFirstPlanGroup() {
+ return planGroups.firstEntry().getValue();
}
- public LogicalNodeGroup getFirstLogicalNodeGroup() {
- return logicalNodeGroups.firstEntry().getValue();
+ public boolean hasPlanGroup() {
+ return planGroups.size() > 0;
}
public void build() {
@@ -173,8 +188,8 @@ public class ExecutionPlan implements GsonObject {
ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
- for (LogicalNodeGroup logicalNodeGroup : logicalNodeGroups.values()) {
- LogicalNode topNode = logicalNodeGroup.nodes.iterator().next();
+ for (PlanGroup planGroup : planGroups.values()) {
+ LogicalNode topNode = planGroup.nodes.iterator().next();
builder.visit(topNode);
this.add(topNode, terminalNode, EdgeType.SINGLE);
}
@@ -238,6 +253,10 @@ public class ExecutionPlan implements GsonObject {
return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
}
+ public Schema getOutSchemaWithPID(int pid) {
+ return planGroups.get(pid).getRootNode().getOutSchema();
+ }
+
@Override
public boolean equals(Object o) {
if (o instanceof ExecutionPlan) {
@@ -266,12 +285,11 @@ public class ExecutionPlan implements GsonObject {
}
public LogicalNode getTopNodeFromPID(int pid) {
- for (Integer childId : graph.getChilds(terminalNode.getPID())) {
- if (childId == pid) {
- return vertices.get(childId);
- }
+ if (planGroups.containsKey(pid)) {
+ return planGroups.get(pid).getRootNode();
+ } else {
+ return null;
}
- return null;
}
public int getChildCount(LogicalNode node) {
@@ -318,33 +336,15 @@ public class ExecutionPlan implements GsonObject {
@Expose private final boolean hasUnionPlan;
@Expose private final InputContext inputContext;
@Expose private final LogicalRootNode terminalNode;
- @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
- @Expose Map<Integer, List<PIDAndEdgeType>> adjacentList = new HashMap<Integer, List<PIDAndEdgeType>>();
+ @Expose private final Map<Integer, PlanGroup> planGroups;
public ExecutionPlanJsonHelper(ExecutionPlan plan) {
this.pidFactory = plan.pidFactory;
this.hasJoinPlan = plan.hasJoinPlan;
this.hasUnionPlan = plan.hasUnionPlan;
- this.inputContext = plan.getInputContext();
+ this.inputContext = plan.inputContext;
this.terminalNode = plan.terminalNode;
- this.vertices.putAll(plan.vertices);
- Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
- int parentId, childId;
- List<PIDAndEdgeType> adjacents;
-
- // convert the graph to an adjacent list
- for (ExecutionPlanEdge edge : edges) {
- childId = edge.getChildId();
- parentId = edge.getParentId();
-
- if (adjacentList.containsKey(childId)) {
- adjacents = adjacentList.get(childId);
- } else {
- adjacents = new ArrayList<PIDAndEdgeType>();
- adjacentList.put(childId, adjacents);
- }
- adjacents.add(new PIDAndEdgeType(parentId, edge.getEdgeType()));
- }
+ this.planGroups = plan.planGroups;
}
@Override
@@ -358,14 +358,10 @@ public class ExecutionPlan implements GsonObject {
plan.hasJoinPlan = this.hasJoinPlan;
plan.hasUnionPlan = this.hasUnionPlan;
plan.setInputContext(this.inputContext);
- plan.vertices.putAll(this.vertices);
-
- for (Entry<Integer, List<PIDAndEdgeType>> e : this.adjacentList.entrySet()) {
- LogicalNode child = this.vertices.get(e.getKey());
- for (PIDAndEdgeType pidAndEdgeType : e.getValue()) {
- plan.add(child, this.vertices.get(pidAndEdgeType.id), pidAndEdgeType.edgeType);
- }
+ for (PlanGroup planGroup : planGroups.values()) {
+ plan.addPlan(planGroup.toLinkedLogicalNode());
}
+ plan.build();
return plan;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index efa4259..912f367 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -25,25 +25,22 @@ import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.PlanGroup;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
import java.util.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.NONE_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
/**
* Build DAG
@@ -84,11 +81,14 @@ public class GlobalPlanner {
ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
if (childExecBlock.getPlan() != null) {
- childExecBlock.getPlan().build();
ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
- DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, lastNode.getPID(), -1, NONE_PARTITION, 1,
- lastNode.getOutSchema());
+ DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
+ dataChannel.setSchema(lastNode.getOutSchema());
+ if (childExecBlock.getPlan().hasPlanGroup()) {
+ DataChannel.linkChannelAndPlanGroups(childExecBlock.getPlan().getFirstPlanGroup(),
+ null, dataChannel);
+ }
masterPlan.addConnect(dataChannel);
masterPlan.setTerminal(terminalBlock);
} else {
@@ -98,14 +98,6 @@ public class GlobalPlanner {
LOG.info(masterPlan);
}
- public static ScanNode buildInputExecutor(LogicalPlan plan, Schema schema, ExecutionBlockId srcId, ExecutionBlockId targetId, StoreType storeType) {
- Preconditions.checkArgument(schema != null,
- "Channel schema (" + srcId +" -> "+ targetId + ") is not initialized");
- TableMeta meta = new TableMeta(storeType, new Options());
- TableDesc desc = new TableDesc(srcId.toString(), schema, meta, new Path("/"));
- return new ScanNode(plan.newPID(), desc);
- }
-
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -116,31 +108,12 @@ public class GlobalPlanner {
private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
ExecutionBlock parent, JoinNode join, boolean leftTable) {
- ExecutionBlock childBlock;
- int targetPid;
- if (leftTable) {
- childBlock = leftBlock;
- if (!childBlock.getPlan().isBuilt()) {
- childBlock.getPlan().build();
- }
- targetPid = parent.getInputContext().getScanNodes()[0].getPID();
- } else {
- childBlock = rightBlock;
- if (!childBlock.getPlan().isBuilt()) {
- childBlock.getPlan().build();
- }
- targetPid = parent.getInputContext().getScanNodes()[1].getPID();
- }
- LogicalNode srcTopNode = childBlock.getPlan().getTopNode(0);
- int srcPid = srcTopNode.getPID();
- if (!parent.getPlan().isBuilt()) {
- parent.getPlan().build();
- }
+ ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
- DataChannel channel = new DataChannel(childBlock, parent, srcPid, targetPid, HASH_PARTITION, 32,
- srcTopNode.getOutSchema());
+ DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
channel.setStoreType(storeType);
if (join.getJoinType() != JoinType.CROSS) {
+ // Each block should have the only one output schema
Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
leftBlock.getPlan().getOutSchema(0), rightBlock.getPlan().getOutSchema(0));
if (leftTable) {
@@ -197,22 +170,21 @@ public class GlobalPlanner {
// symmetric repartition join
currentBlock = masterPlan.newExecutionBlock();
- leftBlock.getPlan().build();
- rightBlock.getPlan().build();
- LogicalNode leftTopNode = leftBlock.getPlan().getTopNode(0);
- LogicalNode rightTopNode = rightBlock.getPlan().getTopNode(0);
- ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftTopNode.getOutSchema(),
- leftBlock.getId(), currentBlock.getId(), storeType);
- ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightTopNode.getOutSchema(),
- rightBlock.getId(), currentBlock.getId(), storeType);
+ DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+ DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
+ ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+ ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
joinNode.setLeftChild(leftScan);
joinNode.setRightChild(rightScan);
currentBlock.setPlan(joinNode);
- DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
- DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+ DataChannel.linkChannelAndPlanGroups(leftBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), leftChannel);
+ DataChannel.linkChannelAndPlanGroups(rightBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), rightChannel);
masterPlan.addConnect(leftChannel);
masterPlan.addConnect(rightChannel);
@@ -254,21 +226,21 @@ public class GlobalPlanner {
SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
currentBlock.getEnforcer().enforceSortAggregation(groupbyNode.getPID(), sortSpecs);
- // setup current block with channel
- ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), topMostOfFirstPhase.getOutSchema(),
- childBlock.getId(), currentBlock.getId(), storeType);
- groupbyNode.setChild(scanNode);
- currentBlock.setPlan(groupbyNode);
// setup channel
- childBlock.getPlan().build();
- currentBlock.getPlan().build();
DataChannel channel;
- channel = new DataChannel(childBlock, currentBlock, topMostOfFirstPhase.getPID(), scanNode.getPID(), HASH_PARTITION,
- 32, topMostOfFirstPhase.getOutSchema());
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
channel.setPartitionKey(groupbyNode.getGroupingColumns());
+ channel.setSchema(topMostOfFirstPhase.getOutSchema());
channel.setStoreType(storeType);
+ // setup current block with channel
+ ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
+ groupbyNode.setChild(scanNode);
+ currentBlock.setPlan(groupbyNode);
+
+ DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), channel);
context.plan.addConnect(channel);
return currentBlock;
@@ -300,10 +272,8 @@ public class GlobalPlanner {
dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
- ExecutionPlan subBlockPlan = subBlock.getPlan();
- LogicalNode topNodeOfSubBlock = subBlockPlan.getLogicalNodeGroupWithPID(dataChannel.getSrcPID())
- .toLinkedLogicalNode();
GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), firstPhaseGroupBy);
+ LogicalNode topNodeOfSubBlock = subBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
g1.setChild(topNodeOfSubBlock);
subBlock.setPlan(g1);
@@ -316,28 +286,24 @@ public class GlobalPlanner {
childBlock.setPlan(firstPhaseGroupBy);
currentBlock = masterPlan.newExecutionBlock();
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), firstPhaseGroupBy.getOutSchema(),
- childBlock.getId(), currentBlock.getId(), storeType);
- groupbyNode.setChild(scanNode);
- groupbyNode.setInSchema(scanNode.getOutSchema());
- currentBlock.setPlan(groupbyNode);
-
DataChannel channel;
- childBlock.getPlan().build();
- currentBlock.getPlan().build();
- int srcPID = childBlock.getPlan().getTopNodePid(0);
- int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
- int partitionNum;
if (firstPhaseGroupBy.isEmptyGrouping()) {
- partitionNum = 1;
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
} else {
- partitionNum = 32;
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
}
- channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, partitionNum,
- firstPhaseGroupBy.getOutSchema());
- channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+ channel.setSchema(firstPhaseGroupBy.getOutSchema());
channel.setStoreType(storeType);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ groupbyNode.setChild(scanNode);
+ groupbyNode.setInSchema(scanNode.getOutSchema());
+ currentBlock.setPlan(groupbyNode);
+
+ DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), channel);
masterPlan.addConnect(channel);
}
}
@@ -350,7 +316,7 @@ public class GlobalPlanner {
ExecutionBlock currentBlock;
SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
- LogicalNode childBlockPlan = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
+ LogicalNode childBlockPlan = childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
firstSortNode.setChild(childBlockPlan);
// sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
firstSortNode.setInSchema(childBlockPlan.getOutSchema());
@@ -358,24 +324,18 @@ public class GlobalPlanner {
childBlock.setPlan(firstSortNode);
currentBlock = masterPlan.newExecutionBlock();
- childBlock.getPlan().build();
- int srcPID = childBlock.getPlan().getTopNodePid(0);
- int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+ channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
+ channel.setSchema(firstSortNode.getOutSchema());
+ channel.setStoreType(storeType);
- ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), firstSortNode.getOutSchema(),
- childBlock.getId(), currentBlock.getId(), storeType);
+ ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
currentNode.setChild(secondScan);
currentNode.setInSchema(secondScan.getOutSchema());
currentBlock.setPlan(currentNode);
- childBlock.getPlan().build();
- currentBlock.getPlan().build();
-
- DataChannel channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32,
- firstSortNode.getOutSchema());
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
-// channel.setSchema(firstSortNode.getOutSchema());
- channel.setStoreType(storeType);
+ DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), channel);
masterPlan.addConnect(channel);
return currentBlock;
@@ -389,9 +349,8 @@ public class GlobalPlanner {
// if result table is not a partitioned table, directly store it
if(partitionDesc == null) {
- LogicalNode topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
- currentNode.setChild(topNodeOfBlock);
- currentNode.setInSchema(topNodeOfBlock.getOutSchema());
+ currentNode.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+ currentNode.setInSchema(childBlock.getPlan().getOutSchema(0));
childBlock.setPlan(currentNode);
return childBlock;
}
@@ -402,35 +361,23 @@ public class GlobalPlanner {
LogicalNode childNode = currentNode.getChild();
childBlock.setPlan(childNode);
- // 2. create a ScanNode for scanning shuffle data
- // StoreTableNode as the root node of the new execution block
+ // 2. create a new execution block, pipeline 2 exec blocks through a DataChannel
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), childNode.getOutSchema(),
- childBlock.getId(), currentBlock.getId(), storeType);
- currentNode.setChild(scanNode);
- currentNode.setInSchema(scanNode.getOutSchema());
- currentBlock.setPlan(currentNode);
-
- // 3. pipeline 2 exec blocks through a DataChannel
DataChannel channel = null;
CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
- childBlock.getPlan().build();
- currentBlock.getPlan().build();
- int srcPID = childBlock.getPlan().getTopNodePid(0);
- int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
- channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, childNode.getOutSchema());
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
Column[] columns = new Column[partitionDesc.getColumns().size()];
channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-// channel.setSchema(childNode.getOutSchema());
+ channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else if (partitionsType == CatalogProtos.PartitionsType.HASH) {
- channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION,
- partitionDesc.getNumPartitions(), childNode.getOutSchema());
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION,
+ partitionDesc.getNumPartitions());
Column[] columns = new Column[partitionDesc.getColumns().size()];
channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-// channel.setSchema(childNode.getOutSchema());
+ channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else if(partitionsType == CatalogProtos.PartitionsType.RANGE) {
// TODO
@@ -438,6 +385,16 @@ public class GlobalPlanner {
// TODO
}
+ // 3. create a ScanNode for scanning shuffle data
+ // StoreTableNode as the root node of the new execution block
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ currentNode.setChild(scanNode);
+ currentNode.setInSchema(scanNode.getOutSchema());
+ currentBlock.setPlan(currentNode);
+
+ DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+ currentBlock.getPlan().getFirstPlanGroup(), channel);
+
masterPlan.addConnect(channel);
return currentBlock;
@@ -458,10 +415,9 @@ public class GlobalPlanner {
LogicalNode child = super.visitProjection(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
- LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
-
- node.setChild(nodeOfExecBlock);
- node.setInSchema(nodeOfExecBlock.getOutSchema());
+ execBlock.getPlan().build();
+ node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+ node.setInSchema(execBlock.getPlan().getOutSchema(0));
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
@@ -473,42 +429,36 @@ public class GlobalPlanner {
LogicalNode child = super.visitLimit(context, plan, node, stack);
ExecutionBlock block;
- LogicalNode topNodeOfBlock;
block = context.execBlockMap.remove(child.getPID());
if (child.getType() == NodeType.SORT) {
- topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
- node.setChild(topNodeOfBlock);
+ node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
block.setPlan(node);
ExecutionBlock childBlock = context.plan.getChild(block, 0);
LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
- topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
- childLimit.setChild(topNodeOfBlock);
+ childLimit.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
childBlock.setPlan(childLimit);
DataChannel channel = context.plan.getChannel(childBlock, block);
channel.setPartitionNum(1);
context.execBlockMap.put(node.getPID(), block);
} else {
- topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
- node.setChild(topNodeOfBlock);
+ node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
block.setPlan(node);
ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
- ScanNode scanNode = buildInputExecutor(plan, node.getOutSchema(), block.getId(), newExecBlock.getId(), storeType);
+ DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
+ newChannel.setPartitionKey(new Column[]{});
+ newChannel.setSchema(node.getOutSchema());
+ newChannel.setStoreType(storeType);
+
+ ScanNode scanNode = buildInputExecutor(plan, newChannel);
LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
parentLimit.setChild(scanNode);
newExecBlock.setPlan(parentLimit);
- block.getPlan().build();
- newExecBlock.getPlan().build();
- int srcPID = block.getPlan().getTopNodePid(0);
- int targetPID = newExecBlock.getInputContext().getScanNodes()[0].getPID();
- DataChannel newChannel = new DataChannel(block, newExecBlock, srcPID, targetPID, HASH_PARTITION, 1, node.getOutSchema());
- newChannel.setPartitionKey(new Column[]{});
-// newChannel.setSchema(node.getOutSchema());
- newChannel.setStoreType(storeType);
-
+ DataChannel.linkChannelAndPlanGroups(block.getPlan().getFirstPlanGroup(),
+ newExecBlock.getPlan().getFirstPlanGroup(), newChannel);
context.plan.addConnect(newChannel);
context.execBlockMap.put(parentLimit.getPID(), newExecBlock);
node = parentLimit;
@@ -548,9 +498,9 @@ public class GlobalPlanner {
LogicalNode child = super.visitFilter(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
- LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
- node.setChild(nodeOfExecBlock);
- node.setInSchema(nodeOfExecBlock.getOutSchema());
+ execBlock.getPlan().build();
+ node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+ node.setInSchema(execBlock.getPlan().getOutSchema(0));
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
@@ -597,30 +547,27 @@ public class GlobalPlanner {
}
ExecutionBlock execBlock;
- int targetPID;
+ PlanGroup parentPlanGroup = null;
if (unionBlocks.size() == 0) {
execBlock = context.plan.newExecutionBlock();
- targetPID = -1;
} else {
execBlock = unionBlocks.get(0);
- targetPID = execBlock.getPlan().getTopNodePid(0);
+ parentPlanGroup = execBlock.getPlan().getFirstPlanGroup();
}
for (ExecutionBlock childBlocks : unionBlocks) {
- ExecutionPlan executionPlan = childBlocks.getPlan();
- executionPlan.build();
- LogicalNode unionCandidate = executionPlan.getTopNode(0);
- queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.LEFT).getPID()));
- queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.RIGHT).getPID()));
+ UnionNode union = (UnionNode) childBlocks.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
+ queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
+ queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
}
- execBlock.getPlan().build();
for (ExecutionBlock childBlocks : queryBlockBlocks) {
- childBlocks.getPlan().build();
- LogicalNode topNode = childBlocks.getPlan().getTopNode(0);
- int srcPID = topNode.getPID();
- DataChannel channel = new DataChannel(childBlocks, execBlock, srcPID, targetPID, NONE_PARTITION, 1, topNode.getOutSchema());
+ DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
channel.setStoreType(storeType);
+ if (childBlocks.getPlan().hasPlanGroup()) {
+ DataChannel.linkChannelAndPlanGroups(childBlocks.getPlan().getFirstPlanGroup(),
+ parentPlanGroup, channel);
+ }
context.plan.addConnect(channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6d03642..d648dec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -26,7 +26,6 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import java.util.ArrayList;
@@ -124,7 +123,7 @@ public class MasterPlan {
@VisibleForTesting
public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
- addConnect(new DataChannel(src, target, null, null, type));
+ addConnect(new DataChannel(src, target, type));
}
public boolean isConnected(ExecutionBlock src, ExecutionBlock target) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
index 8d0eecf..6cecf6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner.graph;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.gson.annotations.Expose;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.util.TUtil;
@@ -34,9 +33,9 @@ import java.util.*;
*/
public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
/** map: child -> parent */
- @Expose protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
+ protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
/** map: parent -> child */
- @Expose protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
+ protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
@Override
public int getVertexSize() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 48c7f71..147c6cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -514,6 +514,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
} else {
ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
+ execBlock.getPlan().build();
+ parent.getPlan().build();
setRepartitionIfNecessary(subQuery, channel);
createTasks(subQuery);
@@ -656,7 +658,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static void createTasks(SubQuery subQuery) throws IOException {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
-// execBlock.getPlan().build();
QueryUnit [] tasks;
if (execBlock.getInputContext() == null) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 48211b4..790a14d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -472,7 +472,7 @@ public class TestPhysicalPlanner {
Column key1 = new Column("score.deptName", Type.TEXT);
Column key2 = new Column("score.class", Type.TEXT);
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+ PartitionType.HASH_PARTITION, numPartitions);
dataChannel.setSchema(rootNode.getOutSchema());
dataChannel.setPartitionKey(new Column[]{key1, key2});
List<DataChannel> channels = new ArrayList<DataChannel>();
@@ -534,7 +534,7 @@ public class TestPhysicalPlanner {
LogicalNode rootNode = optimizer.optimize(plan);
int numPartitions = 1;
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+ PartitionType.HASH_PARTITION, numPartitions);
dataChannel.setSchema(rootNode.getOutSchema());
dataChannel.setPartitionKey(new Column[]{});
List<DataChannel> channels = new ArrayList<DataChannel>();
@@ -852,7 +852,7 @@ public class TestPhysicalPlanner {
SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.RANGE_PARTITION);
+ PartitionType.RANGE_PARTITION);
channel.setSchema(rootNode.getOutSchema());
channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
List<DataChannel> channels = new ArrayList<DataChannel>();