You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:40 UTC

[43/50] [abbrv] git commit: DAG-execplan: need to refactor the execution plan

DAG-execplan: need to refactor the execution plan


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/53e84643
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/53e84643
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/53e84643

Branch: refs/heads/DAG-execplan
Commit: 53e8464333a0055269d47a4855cb9b606cc2707b
Parents: 6474b34
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 26 23:53:11 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 26 23:53:11 2013 +0900

----------------------------------------------------------------------
 .../apache/tajo/engine/planner/LogicalPlan.java |   3 +-
 .../engine/planner/global/ExecutionBlock.java   |   4 +
 .../engine/planner/global/ExecutionPlan.java    |  28 ++---
 .../engine/planner/global/GlobalPlanner.java    | 115 ++++++++++++-------
 .../tajo/engine/planner/global/MasterPlan.java  |   2 +-
 .../planner/physical/IndexedStoreExec.java      |   4 +-
 .../planner/physical/PartitionedStoreExec.java  |   2 +-
 .../planner/global/TestExecutionPlan.java       |   5 +-
 .../planner/physical/TestBNLJoinExec.java       |  10 +-
 .../planner/physical/TestBSTIndexExec.java      |   2 +-
 .../planner/physical/TestExternalSortExec.java  |   2 +-
 .../physical/TestFullOuterHashJoinExec.java     |  26 +++--
 .../physical/TestFullOuterMergeJoinExec.java    |  31 +++--
 .../planner/physical/TestHashAntiJoinExec.java  |   2 +-
 .../planner/physical/TestHashJoinExec.java      |   5 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   2 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  25 ++--
 .../physical/TestLeftOuterNLJoinExec.java       |  26 +++--
 .../planner/physical/TestMergeJoinExec.java     |   2 +-
 .../engine/planner/physical/TestNLJoinExec.java |  11 +-
 .../planner/physical/TestPhysicalPlanner.java   |  44 +++----
 .../physical/TestRightOuterHashJoinExec.java    |  16 ++-
 .../physical/TestRightOuterMergeJoinExec.java   |  30 +++--
 .../engine/planner/physical/TestSortExec.java   |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   9 +-
 25 files changed, 240 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index bfc1d16..6ba8c09 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.gson.annotations.Expose;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.NotThreadSafe;
 import org.apache.tajo.catalog.Column;
@@ -62,7 +63,7 @@ public class LogicalPlan {
   private PIDFactory pidFactory = new PIDFactory();
 
   public static class PIDFactory {
-    private int nextPid = 0;
+    @Expose private int nextPid = 0;
 
     public int newPID() {
       return nextPid++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index f964302..17e365a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -57,6 +57,10 @@ public class ExecutionBlock {
     executionPlan.setPlan(plan);
   }
 
+  public void setPlan(ExecutionPlan plan) {
+    this.executionPlan = plan;
+  }
+
   public ExecutionPlan getPlan() {
     return executionPlan;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index ee42eef..4a83614 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -56,7 +56,7 @@ public class ExecutionPlan implements GsonObject {
 
   public ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
     this(pidFactory);
-    this.terminalNode = PlannerUtil.clone(pidFactory, terminalNode);
+    this.terminalNode = terminalNode;
   }
 
   public void setPlan(LogicalNode plan) {
@@ -203,25 +203,27 @@ public class ExecutionPlan implements GsonObject {
     this.graph.removeEdge(child.getPID(), parent.getPID());
   }
 
-  private static class LogicalNodeIdAndTag {
+  private static class PIDAndTag {
     @Expose int id;
     @Expose Tag tag;
 
-    public LogicalNodeIdAndTag(int id, Tag tag) {
+    public PIDAndTag(int id, Tag tag) {
       this.id = id;
       this.tag = tag;
     }
   }
 
   public static class ExecutionPlanJsonHelper implements GsonObject {
+    @Expose private final PIDFactory pidFactory;
     @Expose private final boolean hasJoinPlan;
     @Expose private final boolean hasUnionPlan;
     @Expose private final InputContext inputContext;
     @Expose private final LogicalRootNode terminalNode;
     @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
-    @Expose Map<Integer, List<LogicalNodeIdAndTag>> adjacentList = new HashMap<Integer, List<LogicalNodeIdAndTag>>();
+    @Expose Map<Integer, List<PIDAndTag>> adjacentList = new HashMap<Integer, List<PIDAndTag>>();
 
     public ExecutionPlanJsonHelper(ExecutionPlan plan) {
+      this.pidFactory = plan.pidFactory;
       this.hasJoinPlan = plan.hasJoinPlan;
       this.hasUnionPlan = plan.hasUnionPlan;
       this.inputContext = plan.getInputContext();
@@ -229,7 +231,7 @@ public class ExecutionPlan implements GsonObject {
       this.vertices.putAll(plan.vertices);
       Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
       int parentId, childId;
-      List<LogicalNodeIdAndTag> adjacents;
+      List<PIDAndTag> adjacents;
 
       // convert the graph to an adjacent list
       for (ExecutionPlanEdge edge : edges) {
@@ -239,10 +241,10 @@ public class ExecutionPlan implements GsonObject {
         if (adjacentList.containsKey(childId)) {
           adjacents = adjacentList.get(childId);
         } else {
-          adjacents = new ArrayList<LogicalNodeIdAndTag>();
+          adjacents = new ArrayList<PIDAndTag>();
           adjacentList.put(childId, adjacents);
         }
-        adjacents.add(new LogicalNodeIdAndTag(parentId, edge.getTag()));
+        adjacents.add(new PIDAndTag(parentId, edge.getTag()));
       }
     }
 
@@ -253,15 +255,15 @@ public class ExecutionPlan implements GsonObject {
 
     public ExecutionPlan toExecutionPlan() {
       // TODO: check that it works
-      ExecutionPlan plan = new ExecutionPlan(null, this.terminalNode);
+      ExecutionPlan plan = new ExecutionPlan(this.pidFactory, this.terminalNode);
       plan.hasJoinPlan = this.hasJoinPlan;
       plan.hasUnionPlan = this.hasUnionPlan;
       plan.setInputContext(this.inputContext);
       plan.vertices.putAll(this.vertices);
 
-      for (Entry<Integer, List<LogicalNodeIdAndTag>> e : this.adjacentList.entrySet()) {
+      for (Entry<Integer, List<PIDAndTag>> e : this.adjacentList.entrySet()) {
         LogicalNode child = this.vertices.get(e.getKey());
-        for (LogicalNodeIdAndTag idAndTag : e.getValue()) {
+        for (PIDAndTag idAndTag : e.getValue()) {
           plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
         }
       }
@@ -283,8 +285,8 @@ public class ExecutionPlan implements GsonObject {
     public boolean compare() {
       Stack<Integer> s1 = new Stack<Integer>();
       Stack<Integer> s2 = new Stack<Integer>();
-      s1.push(plan1.terminalNode.getPID());
-      s2.push(plan2.terminalNode.getPID());
+      s1.push(plan1.getTopNode(0).getPID());
+      s2.push(plan2.getTopNode(0).getPID());
       return recursiveCompare(s1, s2);
     }
 
@@ -302,9 +304,9 @@ public class ExecutionPlan implements GsonObject {
             for (Integer child : plan2.graph.getChilds(l2)) {
               s2.push(child);
             }
+            return recursiveCompare(s1, s2);
           } else {
             equal &= true;
-            return recursiveCompare(s1, s2);
           }
         } else {
           equal = false;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index e0118ea..1cebc8e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -42,7 +42,8 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.NONE_PARTITION;
 
 /**
  * Build DAG
@@ -298,14 +299,16 @@ public class GlobalPlanner {
         currentBlock = masterPlan.newExecutionBlock();
 
         DataChannel channel;
+        int srcPID = childBlock.getPlan().getTopNodePid(0);
+        int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
         if (firstPhaseGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+          channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 1, firstPhaseGroupBy.getOutSchema());
           channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         } else {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+          channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, firstPhaseGroupBy.getOutSchema());
           channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         }
-        channel.setSchema(firstPhaseGroupBy.getOutSchema());
+//        channel.setSchema(firstPhaseGroupBy.getOutSchema());
         channel.setStoreType(storeType);
 
         ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -323,18 +326,23 @@ public class GlobalPlanner {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode);
-    LogicalNode childBlockPlan = childBlock.getPlan();
-    firstSortNode.setChild(childBlockPlan);
+    SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
+//    LogicalNode childBlockPlan = childBlock.getPlan();
+    ExecutionPlan childBlockPlan = childBlock.getPlan();
+    childBlockPlan.add(childBlockPlan.getTopNode(0), firstSortNode, Tag.SINGLE);
+//    firstSortNode.setChild(childBlockPlan.getTopNode(0));
     // sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
-    firstSortNode.setInSchema(childBlockPlan.getOutSchema());
-    firstSortNode.setOutSchema(childBlockPlan.getOutSchema());
-    childBlock.setPlan(firstSortNode);
+    firstSortNode.setInSchema(childBlockPlan.getOutSchema(0));
+    firstSortNode.setOutSchema(childBlockPlan.getOutSchema(0));
+//    childBlock.setPlan(firstSortNode);
 
     currentBlock = masterPlan.newExecutionBlock();
-    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+    int srcPID = childBlock.getPlan().getTopNodePid(0);
+    int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+    DataChannel channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32,
+        firstSortNode.getOutSchema());
     channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
-    channel.setSchema(firstSortNode.getOutSchema());
+//    channel.setSchema(firstSortNode.getOutSchema());
     channel.setStoreType(storeType);
 
     ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -354,9 +362,11 @@ public class GlobalPlanner {
 
     // if result table is not a partitioned table, directly store it
     if(partitionDesc == null) {
-      currentNode.setChild(childBlock.getPlan());
-      currentNode.setInSchema(childBlock.getPlan().getOutSchema());
-      childBlock.setPlan(currentNode);
+//      currentNode.setChild(childBlock.getPlan().getTopNode(0));
+      ExecutionPlan executionPlan = childBlock.getPlan();
+      executionPlan.add(executionPlan.getTopNode(0), currentNode, Tag.SINGLE);
+      currentNode.setInSchema(childBlock.getPlan().getOutSchema(0));
+//      childBlock.setPlan(currentNode);
       return childBlock;
     }
 
@@ -371,18 +381,20 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
     DataChannel channel = null;
     CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+    int srcPID = childBlock.getPlan().getTopNodePid(0);
+    int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
     if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
-      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+      channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, childNode.getOutSchema());
       Column[] columns = new Column[partitionDesc.getColumns().size()];
       channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-      channel.setSchema(childNode.getOutSchema());
+//      channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
     } else if (partitionsType == CatalogProtos.PartitionsType.HASH) {
-      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION,
-          partitionDesc.getNumPartitions());
+      channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION,
+          partitionDesc.getNumPartitions(), childNode.getOutSchema());
       Column[] columns = new Column[partitionDesc.getColumns().size()];
       channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-      channel.setSchema(childNode.getOutSchema());
+//      channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
     } else if(partitionsType == CatalogProtos.PartitionsType.RANGE) {
       // TODO
@@ -418,9 +430,11 @@ public class GlobalPlanner {
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
 
-      node.setChild(execBlock.getPlan());
-      node.setInSchema(execBlock.getPlan().getOutSchema());
-      execBlock.setPlan(node);
+//      node.setChild(execBlock.getPlan().getTopNode(0));
+      ExecutionPlan executionPlan = execBlock.getPlan();
+      executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
+      node.setInSchema(execBlock.getPlan().getOutSchema(0));
+//      execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);
       return node;
     }
@@ -433,31 +447,40 @@ public class GlobalPlanner {
       ExecutionBlock block;
       block = context.execBlockMap.remove(child.getPID());
       if (child.getType() == NodeType.SORT) {
-        node.setChild(block.getPlan());
-        block.setPlan(node);
+//        node.setChild(block.getPlan());
+//        block.setPlan(node);
+        ExecutionPlan blockPlan = block.getPlan();
+        blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
 
         ExecutionBlock childBlock = context.plan.getChild(block, 0);
-        LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
-        childLimit.setChild(childBlock.getPlan());
-        childBlock.setPlan(childLimit);
+        LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
+//        childLimit.setChild(childBlock.getPlan());
+//        childBlock.setPlan(childLimit);
+        ExecutionPlan childPlan = childBlock.getPlan();
+        childPlan.add(childPlan.getTopNode(0), childLimit, Tag.SINGLE);
 
         DataChannel channel = context.plan.getChannel(childBlock, block);
         channel.setPartitionNum(1);
         context.execBlockMap.put(node.getPID(), block);
       } else {
-        node.setChild(block.getPlan());
-        block.setPlan(node);
+//        node.setChild(block.getPlan());
+//        block.setPlan(node);
+        ExecutionPlan blockPlan = block.getPlan();
+        blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
 
         ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
-        DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
+        ScanNode scanNode = buildInputExecutor(plan, node.getOutSchema(), block.getId(), newExecBlock.getId(), storeType);
+        LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
+        parentLimit.setChild(scanNode);
+        newExecBlock.setPlan(parentLimit);
+
+        int srcPID = block.getPlan().getTopNodePid(0);
+        int targetPID = newExecBlock.getInputContext().getScanNodes()[0].getPID();
+        DataChannel newChannel = new DataChannel(block, newExecBlock, srcPID, targetPID, HASH_PARTITION, 1, node.getOutSchema());
         newChannel.setPartitionKey(new Column[]{});
-        newChannel.setSchema(node.getOutSchema());
+//        newChannel.setSchema(node.getOutSchema());
         newChannel.setStoreType(storeType);
 
-        ScanNode scanNode = buildInputExecutor(plan, newChannel);
-        LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan(), node);
-        parentLimit.setChild(scanNode);
-        newExecBlock.setPlan(parentLimit);
         context.plan.addConnect(newChannel);
         context.execBlockMap.put(parentLimit.getPID(), newExecBlock);
         node = parentLimit;
@@ -497,9 +520,11 @@ public class GlobalPlanner {
       LogicalNode child = super.visitFilter(context, plan, node, stack);
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-      node.setChild(execBlock.getPlan());
-      node.setInSchema(execBlock.getPlan().getOutSchema());
-      execBlock.setPlan(node);
+//      node.setChild(execBlock.getPlan());
+      node.setInSchema(execBlock.getPlan().getOutSchema(0));
+//      execBlock.setPlan(node);
+      ExecutionPlan executionPlan = execBlock.getPlan();
+      executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
       context.execBlockMap.put(node.getPID(), execBlock);
 
       return node;
@@ -552,13 +577,17 @@ public class GlobalPlanner {
       }
 
       for (ExecutionBlock childBlocks : unionBlocks) {
-        UnionNode union = (UnionNode) childBlocks.getPlan();
-        queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
-        queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
+        ExecutionPlan executionPlan = childBlocks.getPlan();
+        LogicalNode unionCandidate = executionPlan.getTopNode(0);
+        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.LEFT).getPID()));
+        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.RIGHT).getPID()));
       }
 
+      int targetPID = execBlock.getInputContext().getScanNodes()[0].getPID();
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
-        DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+        LogicalNode topNode = childBlocks.getPlan().getTopNode(0);
+        int srcPID = topNode.getPID();
+        DataChannel channel = new DataChannel(childBlocks, execBlock, srcPID, targetPID, NONE_PARTITION, 1, topNode.getOutSchema());
         channel.setStoreType(storeType);
         context.plan.addConnect(channel);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 725e3f7..bef025a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -93,7 +93,7 @@ public class MasterPlan {
   }
 
   public ExecutionBlock newExecutionBlock() {
-    ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(),
+    ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(), plan.getPidFactory(),
         (LogicalRootNode) plan.getRootBlock().getRoot());
     execBlockMap.put(newExecBlock.getId(), newExecBlock);
     return newExecBlock;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index c9379ca..6c5693f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -69,8 +69,8 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
     this.comp = new TupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);
-    this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
-        context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+    this.meta = CatalogUtil.newTableMeta(context.getOutgoingChannels() != null && context.getOutgoingChannels().get(0) != null
+        ? context.getOutgoingChannels().get(0).getStoreType() : CatalogProtos.StoreType.RAW);
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);
     this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index c81d5d9..aee351b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -65,7 +65,7 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     Preconditions.checkArgument(plan.hasPartitionKey());
     this.plan = plan;
-    this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
+    this.meta = CatalogUtil.newTableMeta(context.getOutgoingChannels().get(0).getStoreType());
     // about the partitions
     this.numPartitions = this.plan.getNumPartitions();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
index e908863..e208d24 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
 import org.apache.tajo.engine.planner.logical.*;
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ public class TestExecutionPlan {
 
     groupbyNode.setChild(scanNode);
 
-    ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(4));
+    ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(4));
     plan.addPlan(groupbyNode);
 
     String json = plan.toJson();
@@ -82,7 +83,7 @@ public class TestExecutionPlan {
     joinNode.setLeftChild(scanNode);
     joinNode.setRightChild(scanNode2);
 
-    ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(5));
+    ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(5));
     plan.addPlan(root1);
     plan.addPlan(root2);
     assertEquals(1, plan.getInputContext().size());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index d816bb1..e8eaccb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -137,7 +137,8 @@ public class TestBNLJoinExec {
   @Test
   public final void testBNLCrossJoin() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
@@ -152,7 +153,7 @@ public class TestBNLJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -173,7 +174,8 @@ public class TestBNLJoinExec {
   @Test
   public final void testBNLInnerJoin() throws IOException, PlanningException {
     Expr context = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
     FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
@@ -191,7 +193,7 @@ public class TestBNLJoinExec {
         merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 2b7f745..0df1746 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -169,7 +169,7 @@ public class TestBSTIndexExec {
     LogicalNode rootNode = optimizer.optimize(plan);
 
     TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
     exec = ((PhysicalRootExec)exec).getChild(0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 6aa0eef..047e3ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -121,7 +121,7 @@ public class TestExternalSortExec {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index e6aaddf..b11582a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -21,9 +21,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.engine.planner.global.ExecutionPlan;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -34,12 +31,15 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -250,7 +250,8 @@ public class TestFullOuterHashJoinExec {
   @Test
   public final void testFullOuterHashJoinExec0() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -264,7 +265,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -290,7 +291,8 @@ public class TestFullOuterHashJoinExec {
   @Test
   public final void testFullOuterHashJoinExec1() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -304,7 +306,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -329,7 +331,8 @@ public class TestFullOuterHashJoinExec {
   @Test
   public final void testFullOuterHashJoinExec2() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -343,7 +346,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -369,7 +372,8 @@ public class TestFullOuterHashJoinExec {
   @Test
   public final void testFullOuterHashJoinExec3() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -384,7 +388,7 @@ public class TestFullOuterHashJoinExec {
         workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index f474979..675927e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -43,6 +43,7 @@ import org.apache.tajo.util.TUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import sun.util.logging.resources.logging_it;
 
 import java.io.IOException;
 
@@ -293,7 +294,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin0() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -307,7 +309,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -331,7 +333,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin1() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -345,7 +348,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -369,7 +372,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin2() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -383,7 +387,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -407,7 +411,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin3() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -421,7 +426,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -448,7 +453,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin4() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[4]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -463,7 +469,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -488,7 +494,8 @@ public class TestFullOuterMergeJoinExec {
   @Test
   public final void testFullOuterMergeJoin5() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[5]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -503,7 +510,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index c3983fd..4bcac45 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -158,7 +158,7 @@ public class TestHashAntiJoinExec {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 68e5ef2..722a0e7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -136,7 +136,8 @@ public class TestHashJoinExec {
   @Test
   public final void testHashInnerJoin() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
@@ -151,7 +152,7 @@ public class TestHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index b4597c7..8caaa5a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -163,7 +163,7 @@ public class TestHashSemiJoinExec {
     System.out.println(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 8082b07..25b5128 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -251,7 +251,8 @@ public class TestLeftOuterHashJoinExec {
   @Test
   public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
@@ -265,7 +266,7 @@ public class TestLeftOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -297,9 +298,10 @@ public class TestLeftOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -339,9 +341,10 @@ public class TestLeftOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -382,9 +385,10 @@ public class TestLeftOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -425,9 +429,10 @@ public class TestLeftOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[4]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+     LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 54a3c49..3cc4362 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
@@ -251,9 +252,10 @@ public class TestLeftOuterNLJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -293,9 +295,10 @@ public class TestLeftOuterNLJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -335,9 +338,10 @@ public class TestLeftOuterNLJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -380,9 +384,10 @@ public class TestLeftOuterNLJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -424,9 +429,10 @@ public class TestLeftOuterNLJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr context =  analyzer.parse(QUERIES[4]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+      LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 57afef5..04121a7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -166,7 +166,7 @@ public class TestMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(root);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index a5270a3..85f972f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
@@ -151,9 +152,10 @@ public class TestNLJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -182,10 +184,11 @@ public class TestNLJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr context =  analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(context);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     //LogicalOptimizer.optimize(ctx, plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 078d24d..48211b4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -194,7 +194,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode =plan.getRootBlock().getRoot();
     optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -225,7 +225,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode =plan.getRootBlock().getRoot();
     optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -255,7 +255,7 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -288,7 +288,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -320,7 +320,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(plan.getRootBlock().getRoot());
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -386,7 +386,7 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -429,7 +429,7 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -483,7 +483,7 @@ public class TestPhysicalPlanner {
 
     FileSystem fs = sm.getFileSystem();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -543,7 +543,7 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType());
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -601,7 +601,7 @@ public class TestPhysicalPlanner {
       }
     }
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -639,7 +639,7 @@ public class TestPhysicalPlanner {
       }
     }
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -663,7 +663,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -692,7 +692,7 @@ public class TestPhysicalPlanner {
     UnionNode union = new UnionNode(plan.newPID(), root.getChild(), clonePlan(plan, root.getChild()));
     root.setChild(union);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(root);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -741,7 +741,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -757,7 +757,7 @@ public class TestPhysicalPlanner {
     plan = planner.createPlan(expr);
     rootNode = optimizer.optimize(plan);
 
-    execPlan = new ExecutionPlan();
+    execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf, sm);
     exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -783,7 +783,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -814,7 +814,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -859,7 +859,7 @@ public class TestPhysicalPlanner {
     channels.add(channel);
     ctx.setOutgoingChannels(channels);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
@@ -952,7 +952,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -976,7 +976,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    execPlan = new ExecutionPlan();
+    execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf,sm);
     exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -1006,7 +1006,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -1030,7 +1030,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    execPlan = new ExecutionPlan();
+    execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     phyPlanner = new PhysicalPlannerImpl(conf,sm);
     exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 42a0cb7..13d1965 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.PlanningException;
@@ -229,9 +230,10 @@ public class TestRightOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -271,9 +273,10 @@ public class TestRightOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -313,9 +316,10 @@ public class TestRightOuterHashJoinExec {
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     Expr expr = analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 948680d..d7d360c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -294,7 +294,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuterMergeJoin0() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[0]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -308,7 +309,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -331,7 +332,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuter_MergeJoin1() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[1]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -345,7 +347,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -367,7 +369,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuterMergeJoin2() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[2]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -381,7 +384,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -403,7 +406,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuter_MergeJoin3() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[3]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -417,7 +421,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -440,7 +444,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuter_MergeJoin4() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[4]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -455,7 +460,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -477,7 +482,8 @@ public class TestRightOuterMergeJoinExec {
   @Test
   public final void testRightOuterMergeJoin5() throws IOException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[5]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
+    LogicalPlan logicalPlan = planner.createPlan(expr);
+    LogicalNode plan = logicalPlan.getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
@@ -493,7 +499,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(logicalPlan.getPidFactory());
     execPlan.addPlan(plan);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index fe90545..fb4d71e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -119,7 +119,7 @@ public class TestSortExec {
     LogicalPlan plan = planner.createPlan(context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/53e84643/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index dd6ff01..d722e7c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -36,10 +36,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.IndexedStoreExec;
-import org.apache.tajo.engine.planner.physical.MemSortExec;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.planner.physical.ProjectionExec;
+import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -142,7 +139,7 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
@@ -258,7 +255,7 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    ExecutionPlan execPlan = new ExecutionPlan();
+    ExecutionPlan execPlan = new ExecutionPlan(plan.getPidFactory());
     execPlan.addPlan(rootNode);
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);