You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:40 UTC
[43/50] [abbrv] git commit: DAG-execplan: need to refactor the
execution plan
DAG-execplan: need to refactor the execution plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/53e84643
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/53e84643
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/53e84643
Branch: refs/heads/DAG-execplan
Commit: 53e8464333a0055269d47a4855cb9b606cc2707b
Parents: 6474b34
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 26 23:53:11 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 26 23:53:11 2013 +0900
----------------------------------------------------------------------
.../apache/tajo/engine/planner/LogicalPlan.java | 3 +-
.../engine/planner/global/ExecutionBlock.java | 4 +
.../engine/planner/global/ExecutionPlan.java | 28 ++---
.../engine/planner/global/GlobalPlanner.java | 115 ++++++++++++-------
.../tajo/engine/planner/global/MasterPlan.java | 2 +-
.../planner/physical/IndexedStoreExec.java | 4 +-
.../planner/physical/PartitionedStoreExec.java | 2 +-
.../planner/global/TestExecutionPlan.java | 5 +-
.../planner/physical/TestBNLJoinExec.java | 10 +-
.../planner/physical/TestBSTIndexExec.java | 2 +-
.../planner/physical/TestExternalSortExec.java | 2 +-
.../physical/TestFullOuterHashJoinExec.java | 26 +++--
.../physical/TestFullOuterMergeJoinExec.java | 31 +++--
.../planner/physical/TestHashAntiJoinExec.java | 2 +-
.../planner/physical/TestHashJoinExec.java | 5 +-
.../planner/physical/TestHashSemiJoinExec.java | 2 +-
.../physical/TestLeftOuterHashJoinExec.java | 25 ++--
.../physical/TestLeftOuterNLJoinExec.java | 26 +++--
.../planner/physical/TestMergeJoinExec.java | 2 +-
.../engine/planner/physical/TestNLJoinExec.java | 11 +-
.../planner/physical/TestPhysicalPlanner.java | 44 +++----
.../physical/TestRightOuterHashJoinExec.java | 16 ++-
.../physical/TestRightOuterMergeJoinExec.java | 30 +++--
.../engine/planner/physical/TestSortExec.java | 2 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 9 +-
25 files changed, 240 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index bfc1d16..6ba8c09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.gson.annotations.Expose;
import org.apache.tajo.algebra.*;
import org.apache.tajo.annotation.NotThreadSafe;
import org.apache.tajo.catalog.Column;
@@ -62,7 +63,7 @@ public class LogicalPlan {
private PIDFactory pidFactory = new PIDFactory();
public static class PIDFactory {
- private int nextPid = 0;
+ @Expose private int nextPid = 0;
public int newPID() {
return nextPid++;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index f964302..17e365a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -57,6 +57,10 @@ public class ExecutionBlock {
executionPlan.setPlan(plan);
}
+ public void setPlan(ExecutionPlan plan) {
+ this.executionPlan = plan;
+ }
+
public ExecutionPlan getPlan() {
return executionPlan;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index ee42eef..4a83614 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -56,7 +56,7 @@ public class ExecutionPlan implements GsonObject {
public ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
this(pidFactory);
- this.terminalNode = PlannerUtil.clone(pidFactory, terminalNode);
+ this.terminalNode = terminalNode;
}
public void setPlan(LogicalNode plan) {
@@ -203,25 +203,27 @@ public class ExecutionPlan implements GsonObject {
this.graph.removeEdge(child.getPID(), parent.getPID());
}
- private static class LogicalNodeIdAndTag {
+ private static class PIDAndTag {
@Expose int id;
@Expose Tag tag;
- public LogicalNodeIdAndTag(int id, Tag tag) {
+ public PIDAndTag(int id, Tag tag) {
this.id = id;
this.tag = tag;
}
}
public static class ExecutionPlanJsonHelper implements GsonObject {
+ @Expose private final PIDFactory pidFactory;
@Expose private final boolean hasJoinPlan;
@Expose private final boolean hasUnionPlan;
@Expose private final InputContext inputContext;
@Expose private final LogicalRootNode terminalNode;
@Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
- @Expose Map<Integer, List<LogicalNodeIdAndTag>> adjacentList = new HashMap<Integer, List<LogicalNodeIdAndTag>>();
+ @Expose Map<Integer, List<PIDAndTag>> adjacentList = new HashMap<Integer, List<PIDAndTag>>();
public ExecutionPlanJsonHelper(ExecutionPlan plan) {
+ this.pidFactory = plan.pidFactory;
this.hasJoinPlan = plan.hasJoinPlan;
this.hasUnionPlan = plan.hasUnionPlan;
this.inputContext = plan.getInputContext();
@@ -229,7 +231,7 @@ public class ExecutionPlan implements GsonObject {
this.vertices.putAll(plan.vertices);
Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
int parentId, childId;
- List<LogicalNodeIdAndTag> adjacents;
+ List<PIDAndTag> adjacents;
// convert the graph to an adjacent list
for (ExecutionPlanEdge edge : edges) {
@@ -239,10 +241,10 @@ public class ExecutionPlan implements GsonObject {
if (adjacentList.containsKey(childId)) {
adjacents = adjacentList.get(childId);
} else {
- adjacents = new ArrayList<LogicalNodeIdAndTag>();
+ adjacents = new ArrayList<PIDAndTag>();
adjacentList.put(childId, adjacents);
}
- adjacents.add(new LogicalNodeIdAndTag(parentId, edge.getTag()));
+ adjacents.add(new PIDAndTag(parentId, edge.getTag()));
}
}
@@ -253,15 +255,15 @@ public class ExecutionPlan implements GsonObject {
public ExecutionPlan toExecutionPlan() {
// TODO: check that it works
- ExecutionPlan plan = new ExecutionPlan(null, this.terminalNode);
+ ExecutionPlan plan = new ExecutionPlan(this.pidFactory, this.terminalNode);
plan.hasJoinPlan = this.hasJoinPlan;
plan.hasUnionPlan = this.hasUnionPlan;
plan.setInputContext(this.inputContext);
plan.vertices.putAll(this.vertices);
- for (Entry<Integer, List<LogicalNodeIdAndTag>> e : this.adjacentList.entrySet()) {
+ for (Entry<Integer, List<PIDAndTag>> e : this.adjacentList.entrySet()) {
LogicalNode child = this.vertices.get(e.getKey());
- for (LogicalNodeIdAndTag idAndTag : e.getValue()) {
+ for (PIDAndTag idAndTag : e.getValue()) {
plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
}
}
@@ -283,8 +285,8 @@ public class ExecutionPlan implements GsonObject {
public boolean compare() {
Stack<Integer> s1 = new Stack<Integer>();
Stack<Integer> s2 = new Stack<Integer>();
- s1.push(plan1.terminalNode.getPID());
- s2.push(plan2.terminalNode.getPID());
+ s1.push(plan1.getTopNode(0).getPID());
+ s2.push(plan2.getTopNode(0).getPID());
return recursiveCompare(s1, s2);
}
@@ -302,9 +304,9 @@ public class ExecutionPlan implements GsonObject {
for (Integer child : plan2.graph.getChilds(l2)) {
s2.push(child);
}
+ return recursiveCompare(s1, s2);
} else {
equal &= true;
- return recursiveCompare(s1, s2);
}
} else {
equal = false;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index e0118ea..1cebc8e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -42,7 +42,8 @@ import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
import java.util.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.NONE_PARTITION;
/**
* Build DAG
@@ -298,14 +299,16 @@ public class GlobalPlanner {
currentBlock = masterPlan.newExecutionBlock();
DataChannel channel;
+ int srcPID = childBlock.getPlan().getTopNodePid(0);
+ int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
if (firstPhaseGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 1, firstPhaseGroupBy.getOutSchema());
channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
} else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, firstPhaseGroupBy.getOutSchema());
channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
}
- channel.setSchema(firstPhaseGroupBy.getOutSchema());
+// channel.setSchema(firstPhaseGroupBy.getOutSchema());
channel.setStoreType(storeType);
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -323,18 +326,23 @@ public class GlobalPlanner {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
- SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode);
- LogicalNode childBlockPlan = childBlock.getPlan();
- firstSortNode.setChild(childBlockPlan);
+ SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
+// LogicalNode childBlockPlan = childBlock.getPlan();
+ ExecutionPlan childBlockPlan = childBlock.getPlan();
+ childBlockPlan.add(childBlockPlan.getTopNode(0), firstSortNode, Tag.SINGLE);
+// firstSortNode.setChild(childBlockPlan.getTopNode(0));
// sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
- firstSortNode.setInSchema(childBlockPlan.getOutSchema());
- firstSortNode.setOutSchema(childBlockPlan.getOutSchema());
- childBlock.setPlan(firstSortNode);
+ firstSortNode.setInSchema(childBlockPlan.getOutSchema(0));
+ firstSortNode.setOutSchema(childBlockPlan.getOutSchema(0));
+// childBlock.setPlan(firstSortNode);
currentBlock = masterPlan.newExecutionBlock();
- DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+ int srcPID = childBlock.getPlan().getTopNodePid(0);
+ int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32,
+ firstSortNode.getOutSchema());
channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
- channel.setSchema(firstSortNode.getOutSchema());
+// channel.setSchema(firstSortNode.getOutSchema());
channel.setStoreType(storeType);
ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -354,9 +362,11 @@ public class GlobalPlanner {
// if result table is not a partitioned table, directly store it
if(partitionDesc == null) {
- currentNode.setChild(childBlock.getPlan());
- currentNode.setInSchema(childBlock.getPlan().getOutSchema());
- childBlock.setPlan(currentNode);
+// currentNode.setChild(childBlock.getPlan().getTopNode(0));
+ ExecutionPlan executionPlan = childBlock.getPlan();
+ executionPlan.add(executionPlan.getTopNode(0), currentNode, Tag.SINGLE);
+ currentNode.setInSchema(childBlock.getPlan().getOutSchema(0));
+// childBlock.setPlan(currentNode);
return childBlock;
}
@@ -371,18 +381,20 @@ public class GlobalPlanner {
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
DataChannel channel = null;
CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+ int srcPID = childBlock.getPlan().getTopNodePid(0);
+ int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, childNode.getOutSchema());
Column[] columns = new Column[partitionDesc.getColumns().size()];
channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
- channel.setSchema(childNode.getOutSchema());
+// channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else if (partitionsType == CatalogProtos.PartitionsType.HASH) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION,
- partitionDesc.getNumPartitions());
+ channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION,
+ partitionDesc.getNumPartitions(), childNode.getOutSchema());
Column[] columns = new Column[partitionDesc.getColumns().size()];
channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
- channel.setSchema(childNode.getOutSchema());
+// channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else if(partitionsType == CatalogProtos.PartitionsType.RANGE) {
// TODO
@@ -418,9 +430,11 @@ public class GlobalPlanner {
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
- node.setChild(execBlock.getPlan());
- node.setInSchema(execBlock.getPlan().getOutSchema());
- execBlock.setPlan(node);
+// node.setChild(execBlock.getPlan().getTopNode(0));
+ ExecutionPlan executionPlan = execBlock.getPlan();
+ executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
+ node.setInSchema(execBlock.getPlan().getOutSchema(0));
+// execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
@@ -433,31 +447,40 @@ public class GlobalPlanner {
ExecutionBlock block;
block = context.execBlockMap.remove(child.getPID());
if (child.getType() == NodeType.SORT) {
- node.setChild(block.getPlan());
- block.setPlan(node);
+// node.setChild(block.getPlan());
+// block.setPlan(node);
+ ExecutionPlan blockPlan = block.getPlan();
+ blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
ExecutionBlock childBlock = context.plan.getChild(block, 0);
- LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
- childLimit.setChild(childBlock.getPlan());
- childBlock.setPlan(childLimit);
+ LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
+// childLimit.setChild(childBlock.getPlan());
+// childBlock.setPlan(childLimit);
+ ExecutionPlan childPlan = childBlock.getPlan();
+ childPlan.add(childPlan.getTopNode(0), childLimit, Tag.SINGLE);
DataChannel channel = context.plan.getChannel(childBlock, block);
channel.setPartitionNum(1);
context.execBlockMap.put(node.getPID(), block);
} else {
- node.setChild(block.getPlan());
- block.setPlan(node);
+// node.setChild(block.getPlan());
+// block.setPlan(node);
+ ExecutionPlan blockPlan = block.getPlan();
+ blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
- DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
+ ScanNode scanNode = buildInputExecutor(plan, node.getOutSchema(), block.getId(), newExecBlock.getId(), storeType);
+ LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
+ parentLimit.setChild(scanNode);
+ newExecBlock.setPlan(parentLimit);
+
+ int srcPID = block.getPlan().getTopNodePid(0);
+ int targetPID = newExecBlock.getInputContext().getScanNodes()[0].getPID();
+ DataChannel newChannel = new DataChannel(block, newExecBlock, srcPID, targetPID, HASH_PARTITION, 1, node.getOutSchema());
newChannel.setPartitionKey(new Column[]{});
- newChannel.setSchema(node.getOutSchema());
+// newChannel.setSchema(node.getOutSchema());
newChannel.setStoreType(storeType);
- ScanNode scanNode = buildInputExecutor(plan, newChannel);
- LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
- parentLimit.setChild(scanNode);
- newExecBlock.setPlan(parentLimit);
context.plan.addConnect(newChannel);
context.execBlockMap.put(parentLimit.getPID(), newExecBlock);
node = parentLimit;
@@ -497,9 +520,11 @@ public class GlobalPlanner {
LogicalNode child = super.visitFilter(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
- node.setChild(execBlock.getPlan());
- node.setInSchema(execBlock.getPlan().getOutSchema());
- execBlock.setPlan(node);
+// node.setChild(execBlock.getPlan());
+ node.setInSchema(execBlock.getPlan().getOutSchema(0));
+// execBlock.setPlan(node);
+ ExecutionPlan executionPlan = execBlock.getPlan();
+ executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
@@ -552,13 +577,17 @@ public class GlobalPlanner {
}
for (ExecutionBlock childBlocks : unionBlocks) {
- UnionNode union = (UnionNode) childBlocks.getPlan();
- queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
- queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
+ ExecutionPlan executionPlan = childBlocks.getPlan();
+ LogicalNode unionCandidate = executionPlan.getTopNode(0);
+ queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.LEFT).getPID()));
+ queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.RIGHT).getPID()));
}
+ int targetPID = execBlock.getInputContext().getScanNodes()[0].getPID();
for (ExecutionBlock childBlocks : queryBlockBlocks) {
- DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+ LogicalNode topNode = childBlocks.getPlan().getTopNode(0);
+ int srcPID = topNode.getPID();
+ DataChannel channel = new DataChannel(childBlocks, execBlock, srcPID, targetPID, NONE_PARTITION, 1, topNode.getOutSchema());
channel.setStoreType(storeType);
context.plan.addConnect(channel);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 725e3f7..bef025a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -93,7 +93,7 @@ public class MasterPlan {
}
public ExecutionBlock newExecutionBlock() {
- ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(),
+ ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(), plan.getPidFactory(),
(LogicalRootNode) plan.getRootBlock().getRoot());
execBlockMap.put(newExecBlock.getId(), newExecBlock);
return newExecBlock;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index c9379ca..6c5693f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -69,8 +69,8 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
this.comp = new TupleComparator(keySchema, sortSpecs);
Path storeTablePath = new Path(context.getWorkDir(), "output");
LOG.info("Output data directory: " + storeTablePath);
- this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
- context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+ this.meta = CatalogUtil.newTableMeta(context.getOutgoingChannels() != null && context.getOutgoingChannels().get(0) != null
+ ? context.getOutgoingChannels().get(0).getStoreType() : CatalogProtos.StoreType.RAW);
FileSystem fs = new RawLocalFileSystem();
fs.mkdirs(storeTablePath);
this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index c81d5d9..aee351b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -65,7 +65,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
Preconditions.checkArgument(plan.hasPartitionKey());
this.plan = plan;
- this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
+ this.meta = CatalogUtil.newTableMeta(context.getOutgoingChannels().get(0).getStoreType());
// about the partitions
this.numPartitions = this.plan.getNumPartitions();
int i = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
index e908863..e208d24 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
import org.apache.tajo.engine.planner.logical.*;
import org.junit.Test;
@@ -47,7 +48,7 @@ public class TestExecutionPlan {
groupbyNode.setChild(scanNode);
- ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(4));
+ ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(4));
plan.addPlan(groupbyNode);
String json = plan.toJson();
@@ -82,7 +83,7 @@ public class TestExecutionPlan {
joinNode.setLeftChild(scanNode);
joinNode.setRightChild(scanNode2);
- ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(5));
+ ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(5));
plan.addPlan(root1);
plan.addPlan(root2);
assertEquals(1, plan.getInputContext().size());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index d816bb1..e8eaccb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -137,7 +137,8 @@ public class TestBNLJoinExec {
@Test
public final void testBNLCrossJoin() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
@@ -152,7 +153,7 @@ public class TestBNLJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -173,7 +174,8 @@ public class TestBNLJoinExec {
@Test
public final void testBNLInnerJoin() throws IOException, PlanningException {
Expr context = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
@@ -191,7 +193,7 @@ public class TestBNLJoinExec {
merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 2b7f745..0df1746 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -169,7 +169,7 @@ public class TestBSTIndexExec {
LogicalNode rootNode = optimizer.optimize(plan);
TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
exec = ((PhysicalRootExec)exec).getChild(0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 6aa0eef..047e3ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -121,7 +121,7 @@ public class TestExternalSortExec {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index e6aaddf..b11582a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -21,9 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.engine.planner.global.ExecutionPlan;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -34,12 +31,15 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -250,7 +250,8 @@ public class TestFullOuterHashJoinExec {
@Test
public final void testFullOuterHashJoinExec0() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -264,7 +265,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -290,7 +291,8 @@ public class TestFullOuterHashJoinExec {
@Test
public final void testFullOuterHashJoinExec1() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -304,7 +306,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -329,7 +331,8 @@ public class TestFullOuterHashJoinExec {
@Test
public final void testFullOuterHashJoinExec2() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -343,7 +346,7 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -369,7 +372,8 @@ public class TestFullOuterHashJoinExec {
@Test
public final void testFullOuterHashJoinExec3() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[3]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -384,7 +388,7 @@ public class TestFullOuterHashJoinExec {
workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index f474979..675927e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -43,6 +43,7 @@ import org.apache.tajo.util.TUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import sun.util.logging.resources.logging_it;
import java.io.IOException;
@@ -293,7 +294,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin0() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -307,7 +309,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -331,7 +333,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin1() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -345,7 +348,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -369,7 +372,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin2() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -383,7 +387,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -407,7 +411,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin3() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[3]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -421,7 +426,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -448,7 +453,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin4() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[4]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -463,7 +469,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -488,7 +494,8 @@ public class TestFullOuterMergeJoinExec {
@Test
public final void testFullOuterMergeJoin5() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[5]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -503,7 +510,7 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index c3983fd..4bcac45 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -158,7 +158,7 @@ public class TestHashAntiJoinExec {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 68e5ef2..722a0e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -136,7 +136,8 @@ public class TestHashJoinExec {
@Test
public final void testHashInnerJoin() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
@@ -151,7 +152,7 @@ public class TestHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index b4597c7..8caaa5a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -163,7 +163,7 @@ public class TestHashSemiJoinExec {
System.out.println(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 8082b07..25b5128 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -251,7 +251,8 @@ public class TestLeftOuterHashJoinExec {
@Test
public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -265,7 +266,7 @@ public class TestLeftOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -297,9 +298,10 @@ public class TestLeftOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -339,9 +341,10 @@ public class TestLeftOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -382,9 +385,10 @@ public class TestLeftOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[3]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -425,9 +429,10 @@ public class TestLeftOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[4]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 54a3c49..3cc4362 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
@@ -251,9 +252,10 @@ public class TestLeftOuterNLJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -293,9 +295,10 @@ public class TestLeftOuterNLJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -335,9 +338,10 @@ public class TestLeftOuterNLJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -380,9 +384,10 @@ public class TestLeftOuterNLJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[3]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -424,9 +429,10 @@ public class TestLeftOuterNLJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr context = analyzer.parse(QUERIES[4]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 57afef5..04121a7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -166,7 +166,7 @@ public class TestMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(root);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index a5270a3..85f972f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
@@ -151,9 +152,10 @@ public class TestNLJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -182,10 +184,11 @@ public class TestNLJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(context);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
//LogicalOptimizer.optimize(ctx, plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 078d24d..48211b4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -194,7 +194,7 @@ public class TestPhysicalPlanner {
LogicalNode rootNode =plan.getRootBlock().getRoot();
optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -225,7 +225,7 @@ public class TestPhysicalPlanner {
LogicalNode rootNode =plan.getRootBlock().getRoot();
optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -255,7 +255,7 @@ public class TestPhysicalPlanner {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -288,7 +288,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -320,7 +320,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(plan.getRootBlock().getRoot());
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -386,7 +386,7 @@ public class TestPhysicalPlanner {
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -429,7 +429,7 @@ public class TestPhysicalPlanner {
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -483,7 +483,7 @@ public class TestPhysicalPlanner {
FileSystem fs = sm.getFileSystem();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -543,7 +543,7 @@ public class TestPhysicalPlanner {
TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -601,7 +601,7 @@ public class TestPhysicalPlanner {
}
}
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -639,7 +639,7 @@ public class TestPhysicalPlanner {
}
}
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -663,7 +663,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -692,7 +692,7 @@ public class TestPhysicalPlanner {
UnionNode union = new UnionNode(plan.newPID(), root.getChild(), clonePlan(plan, root.getChild()));
root.setChild(union);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(root);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -741,7 +741,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -757,7 +757,7 @@ public class TestPhysicalPlanner {
plan = planner.createPlan(expr);
rootNode = optimizer.optimize(plan);
- execPlan = new ExecutionPlan();
+ execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf, sm);
exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -783,7 +783,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -814,7 +814,7 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -859,7 +859,7 @@ public class TestPhysicalPlanner {
channels.add(channel);
ctx.setOutgoingChannels(channels);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -952,7 +952,7 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -976,7 +976,7 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- execPlan = new ExecutionPlan();
+ execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -1006,7 +1006,7 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -1030,7 +1030,7 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- execPlan = new ExecutionPlan();
+ execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 42a0cb7..13d1965 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
@@ -229,9 +230,10 @@ public class TestRightOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -271,9 +273,10 @@ public class TestRightOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -313,9 +316,10 @@ public class TestRightOuterHashJoinExec {
TaskAttemptContext ctx = new TaskAttemptContext(conf,
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
Expr expr = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 948680d..d7d360c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -294,7 +294,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuterMergeJoin0() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[0]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -308,7 +309,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -331,7 +332,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuter_MergeJoin1() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[1]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -345,7 +347,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -367,7 +369,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuterMergeJoin2() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[2]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -381,7 +384,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -403,7 +406,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuter_MergeJoin3() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[3]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -417,7 +421,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -440,7 +444,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuter_MergeJoin4() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[4]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -455,7 +460,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -477,7 +482,8 @@ public class TestRightOuterMergeJoinExec {
@Test
public final void testRightOuterMergeJoin5() throws IOException, PlanningException {
Expr expr = analyzer.parse(QUERIES[5]);
- LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+ LogicalPlan logicalPlan = planner.createPlan(expr);
+ LogicalNode plan = logicalPlan.getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -493,7 +499,7 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
execPlan.addPlan(plan);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index fe90545..fb4d71e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -119,7 +119,7 @@ public class TestSortExec {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index dd6ff01..d722e7c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -36,10 +36,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.IndexedStoreExec;
-import org.apache.tajo.engine.planner.physical.MemSortExec;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.planner.physical.ProjectionExec;
+import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -142,7 +139,7 @@ public class TestRangeRetrieverHandler {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -258,7 +255,7 @@ public class TestRangeRetrieverHandler {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- ExecutionPlan execPlan = new ExecutionPlan();
+ ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
execPlan.addPlan(rootNode);
PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);