You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/23 14:15:45 UTC

[2/2] git commit: TAJO-194: LogicalNode should have an identifier to distinguish each logical node instance. (hyunsik)

TAJO-194: LogicalNode should have an identifier to distinguish each logical node instance. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fc018de8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fc018de8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fc018de8

Branch: refs/heads/master
Commit: fc018de823dd34d769eb73f3c42e089b0d992b81
Parents: 44bb57d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 23 21:04:02 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 23 21:04:02 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/tajo/engine/planner/InsertNode.java  |   8 +-
 .../apache/tajo/engine/planner/LogicalPlan.java |  44 +++---
 .../tajo/engine/planner/LogicalPlanner.java     |  91 +++++------
 .../engine/planner/PhysicalPlannerImpl.java     |  11 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  22 ---
 .../tajo/engine/planner/TargetListManager.java  |   2 +-
 .../tajo/engine/planner/logical/BinaryNode.java |  13 +-
 .../engine/planner/logical/CreateTableNode.java |   4 +-
 .../engine/planner/logical/DropTableNode.java   |   4 +-
 .../engine/planner/logical/EvalExprNode.java    |   4 +-
 .../tajo/engine/planner/logical/ExceptNode.java |   8 +-
 .../engine/planner/logical/GroupbyNode.java     |  13 +-
 .../engine/planner/logical/IndexScanNode.java   |   4 +-
 .../engine/planner/logical/IntersectNode.java   |   9 +-
 .../tajo/engine/planner/logical/JoinNode.java   |   8 +-
 .../tajo/engine/planner/logical/LimitNode.java  |   8 +-
 .../engine/planner/logical/LogicalNode.java     |  24 +--
 .../engine/planner/logical/LogicalRootNode.java |   4 +-
 .../tajo/engine/planner/logical/NodeType.java   |   2 -
 .../tajo/engine/planner/logical/PipeType.java   |  24 ---
 .../engine/planner/logical/ProjectionNode.java  |  12 +-
 .../engine/planner/logical/ReceiveNode.java     | 126 ----------------
 .../engine/planner/logical/RelationNode.java    |   4 +-
 .../engine/planner/logical/RepartitionType.java |  25 ----
 .../tajo/engine/planner/logical/ScanNode.java   |  12 +-
 .../engine/planner/logical/SelectionNode.java   |   8 +-
 .../tajo/engine/planner/logical/SendNode.java   | 149 -------------------
 .../tajo/engine/planner/logical/SortNode.java   |  15 +-
 .../engine/planner/logical/StoreIndexNode.java  |   4 +-
 .../engine/planner/logical/StoreTableNode.java  |   8 +-
 .../planner/logical/TableSubQueryNode.java      |   4 +-
 .../tajo/engine/planner/logical/UnaryNode.java  |   8 +-
 .../tajo/engine/planner/logical/UnionNode.java  |   8 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   4 +-
 .../org/apache/tajo/master/GlobalPlanner.java   |  16 +-
 .../apache/tajo/master/GlobalPlannerUtils.java  |  44 ------
 .../java/org/apache/tajo/util/IndexUtil.java    |   5 +-
 .../engine/planner/logical/TestReceiveNode.java |  61 --------
 .../engine/planner/logical/TestSendNode.java    |  49 ------
 .../planner/physical/TestMergeJoinExec.java     |  14 +-
 .../planner/physical/TestPhysicalPlanner.java   |   2 +-
 42 files changed, 170 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d9fc2b..223ffa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-194: LogicalNode should have an identifier to distinguish each
+    logical node instance. (hyunsik)
+
     TAJO-183: Creating too many TableMetaProto objects might lead a potential 
     memory leak. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ed13fb2..ae88630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -39,16 +39,16 @@ public class InsertNode extends LogicalNode implements Cloneable {
   @Expose private LogicalNode subQuery;
 
 
-  public InsertNode(TableDesc desc, LogicalNode subQuery) {
-    super(NodeType.INSERT);
+  public InsertNode(int pid, TableDesc desc, LogicalNode subQuery) {
+    super(pid, NodeType.INSERT);
     this.targetTableDesc = desc;
     this.subQuery = subQuery;
     this.setInSchema(subQuery.getOutSchema());
     this.setOutSchema(subQuery.getOutSchema());
   }
 
-  public InsertNode(Path location, LogicalNode subQuery) {
-    super(NodeType.INSERT);
+  public InsertNode(int pid, Path location, LogicalNode subQuery) {
+    super(pid, NodeType.INSERT);
     this.path = location;
     this.subQuery = subQuery;
     this.setInSchema(subQuery.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index c29f60c..7e7b45e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -40,18 +40,16 @@ public class LogicalPlan {
   public static final char VIRTUAL_TABLE_PREFIX='@';
   /** it indicates the root block */
   public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
-  /** it indicates a table itself */
-  public static final String TABLE_SELF = VIRTUAL_TABLE_PREFIX + "SELF";
-
-  public static final String ANONYMOUS_TABLE_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
-  public static Integer anonymousBlockId = 0;
-  public static Integer anonymousColumnId = 0;
+  public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
+  private int nextPid = 0;
+  private Integer noNameBlockId = 0;
+  private Integer noNameColumnId = 0;
 
   /** a map from between a block name to a block plan */
   private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
-  private Map<LogicalNode, QueryBlock> queryBlockByNode = new HashMap<LogicalNode, QueryBlock>();
+  private Map<Integer, LogicalNode> nodeMap = new HashMap<Integer, LogicalNode>();
+  private Map<Integer, QueryBlock> queryBlockByPID = new HashMap<Integer, QueryBlock>();
   private SimpleDirectedGraph<String, BlockEdge> queryBlockGraph = new SimpleDirectedGraph<String, BlockEdge>();
-  private Set<LogicalNode> visited = new HashSet<LogicalNode>();
 
   public LogicalPlan(LogicalPlanner planner) {
     this.planner = planner;
@@ -69,12 +67,16 @@ public class LogicalPlan {
     return block;
   }
 
-  public QueryBlock newAnonymousBlock() {
-    return newAndGetBlock(ANONYMOUS_TABLE_PREFIX + (anonymousBlockId++));
+  public int newPID() {
+    return nextPid++;
+  }
+
+  public QueryBlock newNoNameBlock() {
+    return newAndGetBlock(NONAME_BLOCK_PREFIX + (noNameBlockId++));
   }
 
-  public String newAnonymousColumnName() {
-    return "?_" + (anonymousColumnId ++);
+  public String newNonameColumnName() {
+    return "?_" + (noNameColumnId++);
   }
 
   /**
@@ -95,17 +97,17 @@ public class LogicalPlan {
   }
 
   public QueryBlock getBlock(LogicalNode node) {
-    return queryBlockByNode.get(node);
+    return queryBlockByPID.get(node.getPID());
   }
 
   public void removeBlock(QueryBlock block) {
     queryBlocks.remove(block.getName());
-    List<LogicalNode> tobeRemoved = new ArrayList<LogicalNode>();
-    for (Map.Entry<LogicalNode, QueryBlock> entry : queryBlockByNode.entrySet()) {
+    List<Integer> tobeRemoved = new ArrayList<Integer>();
+    for (Map.Entry<Integer, QueryBlock> entry : queryBlockByPID.entrySet()) {
       tobeRemoved.add(entry.getKey());
     }
-    for (LogicalNode rn : tobeRemoved) {
-      queryBlockByNode.remove(rn);
+    for (Integer rn : tobeRemoved) {
+      queryBlockByPID.remove(rn);
     }
   }
 
@@ -353,7 +355,7 @@ public class LogicalPlan {
         LogicalRootNode rootNode = (LogicalRootNode) blockRoot;
         rootType = rootNode.getChild().getType();
       }
-      queryBlockByNode.put(blockRoot, this);
+      queryBlockByPID.put(blockRoot.getPID(), this);
     }
 
     public <T extends LogicalNode> T getRoot() {
@@ -510,10 +512,12 @@ public class LogicalPlan {
     }
 
     public boolean postVisit(LogicalNode node, Stack<OpType> path) {
-      if (visited.contains(node)) {
+      if (nodeMap.containsKey(node.getPID())) {
         return false;
       }
 
+      nodeMap.put(node.getPID(), node);
+
       // if an added operator is a relation, add it to relation set.
       switch (node.getType()) {
         case STORE:
@@ -721,7 +725,7 @@ public class LogicalPlan {
             } else {
               Target [] addedTargets = new Target[aggrFunctions.size()];
               for (int i = 0; i < aggrFunctions.size(); i++) {
-                Target aggrFunctionTarget = new Target(aggrFunctions.get(i), newAnonymousColumnName());
+                Target aggrFunctionTarget = new Target(aggrFunctions.get(i), newNonameColumnName());
                 addedTargets[i] = aggrFunctionTarget;
                 EvalTreeUtil.replace(havingCondition, aggrFunctions.get(i),
                     new FieldEval(aggrFunctionTarget.getColumnSchema()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 6dd3e77..52f0547 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -96,7 +96,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     PlanContext context = new PlanContext(plan, rootBlock);
     subroot = visitChild(context, stack, expr);
 
-    LogicalRootNode root = new LogicalRootNode();
+    LogicalRootNode root = new LogicalRootNode(plan.newPID());
     root.setInSchema(subroot.getOutSchema());
     root.setOutSchema(subroot.getOutSchema());
     root.setChild(subroot);
@@ -140,13 +140,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
   }
 
-  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr)
+      throws PlanningException {
     QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
     PlanContext newContext = new PlanContext(context.plan, newBlock);
     Stack<OpType> newStack = new Stack<OpType>();
     LogicalNode child = visitChild(newContext, newStack, expr.getSubQuery());
     context.plan.connectBlocks(newContext.block, context.block, BlockType.TableSubQuery);
-    return new TableSubQueryNode(expr.getName(), child);
+    return new TableSubQueryNode(context.plan.newPID(), expr.getName(), child);
   }
 
 
@@ -162,9 +163,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     ScanNode scanNode;
     if (relation.hasAlias()) {
-      scanNode = new ScanNode(desc, relation.getAlias());
+      scanNode = new ScanNode(context.plan.newPID(), desc, relation.getAlias());
     } else {
-      scanNode = new ScanNode(desc);
+      scanNode = new ScanNode(context.plan.newPID(), desc);
     }
 
     return scanNode;
@@ -186,7 +187,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       for (int i = 1; i < relations.size(); i++) {
         left = current;
         right = visitChild(context, stack, relations.getRelations()[i]);
-        current = createCatasianProduct(left, right);
+        current = createCatasianProduct(context.plan, left, right);
       }
     }
 
@@ -207,7 +208,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     stack.pop();
 
     // Phase 3: build this plan
-    JoinNode joinNode = new JoinNode(join.getJoinType(), left, right);
+    JoinNode joinNode = new JoinNode(plan.newPID(), join.getJoinType(), left, right);
 
     // Set A merged input schema
     Schema merged;
@@ -255,8 +256,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return njQual;
   }
 
-  private static LogicalNode createCatasianProduct(LogicalNode left, LogicalNode right) {
-    JoinNode join = new JoinNode(JoinType.CROSS, left, right);
+  private static LogicalNode createCatasianProduct(LogicalPlan plan, LogicalNode left, LogicalNode right) {
+    JoinNode join = new JoinNode(plan.newPID(), JoinType.CROSS, left, right);
     Schema joinSchema = SchemaUtil.merge(
         join.getLeftChild().getOutSchema(),
         join.getRightChild().getOutSchema());
@@ -319,27 +320,27 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     QueryBlock block = context.block;
 
     // 2. Build Child Plans
-    PlanContext leftContext = new PlanContext(plan, plan.newAnonymousBlock());
+    PlanContext leftContext = new PlanContext(plan, plan.newNoNameBlock());
     Stack<OpType> leftStack = new Stack<OpType>();
     LogicalNode left = visitChild(leftContext, leftStack, setOperation.getLeft());
-    TableSubQueryNode leftSubQuery = new TableSubQueryNode(leftContext.block.getName(), left);
+    TableSubQueryNode leftSubQuery = new TableSubQueryNode(plan.newPID(), leftContext.block.getName(), left);
     context.plan.connectBlocks(leftContext.block, context.block, BlockType.TableSubQuery);
 
-    PlanContext rightContext = new PlanContext(plan, plan.newAnonymousBlock());
+    PlanContext rightContext = new PlanContext(plan, plan.newNoNameBlock());
     Stack<OpType> rightStack = new Stack<OpType>();
     LogicalNode right = visitChild(rightContext, rightStack, setOperation.getRight());
-    TableSubQueryNode rightSubQuery = new TableSubQueryNode(rightContext.block.getName(), right);
+    TableSubQueryNode rightSubQuery = new TableSubQueryNode(plan.newPID(), rightContext.block.getName(), right);
     context.plan.connectBlocks(rightContext.block, context.block, BlockType.TableSubQuery);
 
     verifySetStatement(setOperation.getType(), leftContext.block, rightContext.block);
 
     BinaryNode setOp;
     if (setOperation.getType() == OpType.Union) {
-      setOp = new UnionNode(leftSubQuery, rightSubQuery);
+      setOp = new UnionNode(plan.newPID(), leftSubQuery, rightSubQuery);
     } else if (setOperation.getType() == OpType.Except) {
-      setOp = new ExceptNode(leftSubQuery, rightSubQuery);
+      setOp = new ExceptNode(plan.newPID(), leftSubQuery, rightSubQuery);
     } else if (setOperation.getType() == OpType.Intersect) {
-      setOp = new IntersectNode(leftSubQuery, rightSubQuery);
+      setOp = new IntersectNode(plan.newPID(), leftSubQuery, rightSubQuery);
     } else {
       throw new VerifyException("Invalid Type: " + setOperation.getType());
     }
@@ -395,7 +396,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // 3. build this plan:
     EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
-    SelectionNode selectionNode = new SelectionNode(searchCondition);
+    SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
 
     // 4. set child plan, update input/output schemas:
     selectionNode.setChild(child);
@@ -435,7 +436,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             groupElements[i].getType(),
             annotateGroupingColumn(plan, block.getName(), groupElements[i].getColumns(), null));
       }
-      GroupbyNode groupingNode = new GroupbyNode(annotatedElements[0].getColumns());
+      GroupbyNode groupingNode = new GroupbyNode(plan.newPID(), annotatedElements[0].getColumns());
       if (aggregation.hasHavingCondition()) {
         groupingNode.setHavingCondition(
             createEvalTree(plan, block, aggregation.getHavingCondition()));
@@ -471,7 +472,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     UnionNode union;
     try {
       if ((cuboids.size() - idx) > 2) {
-        GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+        GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
         Target[] clone = cloneTargets(block.getCurrentTargets());
 
         g1.setTargets(clone);
@@ -481,13 +482,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         g1.setOutSchema(outSchema);
 
         LogicalNode right = createGroupByUnion(plan, block, subNode, cuboids, idx+1);
-        union = new UnionNode(g1, right);
+        union = new UnionNode(plan.newPID(), g1, right);
         union.setInSchema(g1.getOutSchema());
         union.setOutSchema(g1.getOutSchema());
 
         return union;
       } else {
-        GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+        GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
         Target[] clone = cloneTargets(block.getCurrentTargets());
         g1.setTargets(clone);
         g1.setChild((LogicalNode) subNode.clone());
@@ -495,14 +496,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         Schema outSchema = getProjectedSchema(plan, clone);
         g1.setOutSchema(outSchema);
 
-        GroupbyNode g2 = new GroupbyNode(cuboids.get(idx+1));
+        GroupbyNode g2 = new GroupbyNode(plan.newPID(), cuboids.get(idx+1));
         clone = cloneTargets(block.getCurrentTargets());
         g2.setTargets(clone);
         g2.setChild((LogicalNode) subNode.clone());
         g2.setInSchema(g1.getChild().getOutSchema());
         outSchema = getProjectedSchema(plan, clone);
         g2.setOutSchema(outSchema);
-        union = new UnionNode(g1, g2);
+        union = new UnionNode(plan.newPID(), g1, g2);
         union.setInSchema(g1.getOutSchema());
         union.setOutSchema(g1.getOutSchema());
 
@@ -575,7 +576,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // 2. Build Child Plans:
     stack.push(OpType.Sort);
     LogicalNode child = visitChild(context, stack, sort.getChild());
-    child = insertGroupbyNodeIfUnresolved(block, child, stack);
+    child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
     stack.pop();
 
     // 3. Build this plan:
@@ -587,7 +588,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(),
           sortSpecs[i].isNullFirst());
     }
-    SortNode sortNode = new SortNode(annotatedSortSpecs);
+    SortNode sortNode = new SortNode(context.plan.newPID(), annotatedSortSpecs);
 
     // 4. Set Child Plan, Update Input/Output Schemas:
     sortNode.setChild(child);
@@ -611,7 +612,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // build limit plan
     EvalNode firstFetchNum = createEvalTree(plan, block, limit.getFetchFirstNum());
     firstFetchNum.eval(null, null, null);
-    LimitNode limitNode = new LimitNode(firstFetchNum.terminate(null).asInt8());
+    LimitNode limitNode = new LimitNode(context.plan.newPID(), firstFetchNum.terminate(null).asInt8());
 
     // set child plan and update input/output schemas.
     limitNode.setChild(child);
@@ -638,7 +639,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     if (!projection.hasChild()) {
       EvalExprNode evalOnly =
-          new EvalExprNode(annotateTargets(plan, block, projection.getTargets()));
+          new EvalExprNode(context.plan.newPID(), annotateTargets(plan, block, projection.getTargets()));
       evalOnly.setOutSchema(getProjectedSchema(plan, evalOnly.getExprs()));
       block.setProjectionNode(evalOnly);
       for (int i = 0; i < evalOnly.getTargets().length; i++) {
@@ -650,7 +651,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // 2: Build Child Plans
     stack.push(OpType.Projection);
     LogicalNode child = visitChild(context, stack, projection.getChild());
-    child = insertGroupbyNodeIfUnresolved(block, child, stack);
+    child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
     stack.pop();
 
     // All targets must be evaluable before the projection.
@@ -659,9 +660,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     ProjectionNode projectionNode;
     if (projection.isAllProjected()) {
-      projectionNode = new ProjectionNode(PlannerUtil.schemaToTargets(child.getOutSchema()));
+      projectionNode = new ProjectionNode(context.plan.newPID(), PlannerUtil.schemaToTargets(child.getOutSchema()));
     } else {
-      projectionNode = new ProjectionNode(block.getCurrentTargets());
+      projectionNode = new ProjectionNode(context.plan.newPID(), block.getCurrentTargets());
     }
 
     block.setProjectionNode(projectionNode);
@@ -674,7 +675,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     } else {
       if (projection.isDistinct()) {
         Schema outSchema = projectionNode.getOutSchema();
-        GroupbyNode dupRemoval = new GroupbyNode(outSchema.toArray());
+        GroupbyNode dupRemoval = new GroupbyNode(plan.newPID(), outSchema.toArray());
         dupRemoval.setTargets(block.getTargetListManager().getTargets());
         dupRemoval.setInSchema(child.getOutSchema());
         dupRemoval.setOutSchema(outSchema);
@@ -690,11 +691,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * Insert a group-by operator before a sort or a projection operator.
    * It is used only when a group-by clause is not given.
    */
-  private LogicalNode insertGroupbyNodeIfUnresolved(QueryBlock block,
+  private LogicalNode insertGroupbyNodeIfUnresolved(LogicalPlan plan, QueryBlock block,
                                                     LogicalNode child, Stack<OpType> stack) throws PlanningException {
 
     if (!block.isGroupingResolved()) {
-      GroupbyNode groupbyNode = new GroupbyNode(new Column[] {});
+      GroupbyNode groupbyNode = new GroupbyNode(plan.newPID(), new Column[] {});
       groupbyNode.setTargets(block.getCurrentTargets());
       groupbyNode.setChild(child);
       groupbyNode.setInSchema(child.getOutSchema());
@@ -731,7 +732,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       stack.add(OpType.CreateTable);
       LogicalNode subQuery = visitChild(context, stack, expr.getSubQuery());
       stack.pop();
-      StoreTableNode storeNode = new StoreTableNode(tableName);
+      StoreTableNode storeNode = new StoreTableNode(context.plan.newPID(), tableName);
       storeNode.setCreateTable();
       storeNode.setChild(subQuery);
 
@@ -759,7 +760,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
       return storeNode;
     } else {
-      CreateTableNode createTableNode = new CreateTableNode(expr.getTableName(),
+      CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
           convertTableElementsSchema(expr.getTableElements()));
 
       if (expr.isExternal()) {
@@ -836,25 +837,25 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return column;
   }
 
-  protected LogicalNode visitInsert(PlanContext ctx, Stack<OpType> stack, Insert expr) throws PlanningException {
+  protected LogicalNode visitInsert(PlanContext context, Stack<OpType> stack, Insert expr) throws PlanningException {
     stack.push(expr.getType());
-    QueryBlock newQueryBlock = ctx.plan.newAnonymousBlock();
-    PlanContext newContext = new PlanContext(ctx.plan, newQueryBlock);
+    QueryBlock newQueryBlock = context.plan.newNoNameBlock();
+    PlanContext newContext = new PlanContext(context.plan, newQueryBlock);
     Stack<OpType> subStack = new Stack<OpType>();
     LogicalNode subQuery = visitChild(newContext, subStack, expr.getSubQuery());
-    ctx.plan.connectBlocks(newQueryBlock, ctx.block, BlockType.TableSubQuery);
+    context.plan.connectBlocks(newQueryBlock, context.block, BlockType.TableSubQuery);
     stack.pop();
 
     InsertNode insertNode = null;
     if (expr.hasTableName()) {
       TableDesc desc = catalog.getTableDesc(expr.getTableName());
-      ctx.block.addRelation(new ScanNode(desc));
+      context.block.addRelation(new ScanNode(context.plan.newPID(), desc));
 
       Schema targetSchema = new Schema();
       if (expr.hasTargetColumns()) {
         String [] targetColumnNames = expr.getTargetColumns();
         for (int i = 0; i < targetColumnNames.length; i++) {
-          Column targetColumn = ctx.plan.resolveColumn(ctx.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
+          Column targetColumn = context.plan.resolveColumn(context.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
           targetSchema.addColumn(targetColumn);
         }
       } else {
@@ -865,13 +866,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       }
 
       ensureDomains(targetSchema, subQuery.getOutSchema());
-      insertNode = new InsertNode(desc, subQuery);
+      insertNode = new InsertNode(context.plan.newPID(), desc, subQuery);
       insertNode.setTargetSchema(targetSchema);
       insertNode.setOutSchema(targetSchema);
     }
 
     if (expr.hasLocation()) {
-      insertNode = new InsertNode(new Path(expr.getLocation()), subQuery);
+      insertNode = new InsertNode(context.plan.newPID(), new Path(expr.getLocation()), subQuery);
       if (expr.hasStorageType()) {
         insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
       }
@@ -906,7 +907,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
   @Override
   public LogicalNode visitDropTable(PlanContext context, Stack<OpType> stack, DropTable dropTable) {
-    DropTableNode dropTableNode = new DropTableNode(dropTable.getTableName());
+    DropTableNode dropTableNode = new DropTableNode(context.plan.newPID(), dropTable.getTableName());
     return dropTableNode;
   }
 
@@ -1180,7 +1181,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       if (t.hasAlias()) {
         name = t.getAlias();
       } else if (t.getEvalTree().getName().equals("?")) {
-        name = plan.newAnonymousColumnName();
+        name = plan.newNonameColumnName();
       } else {
         name = t.getEvalTree().getName();
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index b118852..4813fd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -45,9 +45,12 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
 
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
+  private static final int UNGENERATED_PID = -1;
+
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
 
+
   public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
     this.conf = conf;
     this.sm = sm;
@@ -76,7 +79,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
                                            PhysicalExec execPlan) throws IOException {
     DataChannel channel = context.getDataChannel();
-    StoreTableNode storeTableNode = new StoreTableNode(channel.getTargetId().toString());
+    StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
     storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
     storeTableNode.setInSchema(execPlan.getSchema());
     storeTableNode.setOutSchema(execPlan.getSchema());
@@ -220,10 +223,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
             joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
         ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
-            new SortNode(sortSpecs[0], outer.getSchema(), outer.getSchema()),
+            new SortNode(UNGENERATED_PID, sortSpecs[0], outer.getSchema(), outer.getSchema()),
             outer);
         ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
-            new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
+            new SortNode(UNGENERATED_PID, sortSpecs[1], inner.getSchema(), inner.getSchema()),
             inner);
 
         LOG.info("The planner chooses [Merge Join]");
@@ -295,7 +298,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         for (int i = 0; i < grpColumns.length; i++) {
           specs[i] = new SortSpec(grpColumns[i], true, false);
         }
-        SortNode sortNode = new SortNode(specs);
+        SortNode sortNode = new SortNode(UNGENERATED_PID, specs);
         sortNode.setInSchema(subOp.getSchema());
         sortNode.setOutSchema(subOp.getSchema());
         // SortExec sortExec = new SortExec(sortNode, child);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 2c9a5eb..7b39d26 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -66,21 +66,6 @@ public class PlannerUtil {
     return tableNames;
   }
   
-  public static LogicalNode insertNode(LogicalNode parent, LogicalNode newNode) {
-    Preconditions.checkArgument(parent instanceof UnaryNode);
-    Preconditions.checkArgument(newNode instanceof UnaryNode);
-    
-    UnaryNode p = (UnaryNode) parent;
-    LogicalNode c = p.getChild();
-    UnaryNode m = (UnaryNode) newNode;
-    m.setInSchema(c.getOutSchema());
-    m.setOutSchema(c.getOutSchema());
-    m.setChild(c);
-    p.setChild(m);
-    
-    return p;
-  }
-  
   /**
    * Delete the logical node from a plan.
    *
@@ -191,13 +176,6 @@ public class PlannerUtil {
     return child;
   }
   
-  private static LogicalNode insertStore(LogicalNode parent, String tableName) {
-    StoreTableNode store = new StoreTableNode(tableName);
-    insertNode(parent, store);
-    
-    return parent;
-  }
-  
   /**
    * Find the top logical node matched to type from the given node
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
index 970ab93..ccb64ce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
@@ -155,7 +155,7 @@ public class TargetListManager {
     if (t.hasAlias()) {
       name = t.getAlias();
     } else if (t.getEvalTree().getName().equals("?")) {
-      name = plan.newAnonymousColumnName();
+      name = plan.newNonameColumnName();
     } else {
       name = t.getEvalTree().getName();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
index e2724e7..37b0fdf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
@@ -27,16 +27,9 @@ import org.apache.tajo.json.GsonObject;
 public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonObject {
 	@Expose LogicalNode leftChild = null;
 	@Expose LogicalNode rightChild = null;
-	
-	public BinaryNode() {
-		super();
-	}
-	
-	/**
-	 * @param opType
-	 */
-	public BinaryNode(NodeType opType) {
-		super(opType);
+
+	public BinaryNode(int pid, NodeType nodeType) {
+		super(pid, nodeType);
 	}
 	
 	public <T extends LogicalNode> T getLeftChild() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index f7ff8ef..50656c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -36,8 +36,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   @Expose private Options options;
   @Expose private boolean external;
 
-  public CreateTableNode(String tableName, Schema schema) {
-    super(NodeType.CREATE_TABLE);
+  public CreateTableNode(int pid, String tableName, Schema schema) {
+    super(pid, NodeType.CREATE_TABLE);
     this.tableName = tableName;
     this.schema = schema;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
index 708c03a..490bfe7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
@@ -23,8 +23,8 @@ import org.apache.tajo.engine.planner.PlanString;
 public class DropTableNode extends LogicalNode {
   private String tableName;
 
-  public DropTableNode(String tableName) {
-    super(NodeType.DROP_TABLE);
+  public DropTableNode(int pid, String tableName) {
+    super(pid, NodeType.DROP_TABLE);
     this.tableName = tableName;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
index bb323dc..5bc41bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
@@ -29,8 +29,8 @@ import org.apache.tajo.util.TUtil;
 public class EvalExprNode extends LogicalNode implements Projectable {
   @Expose private Target[] exprs;
 
-  public EvalExprNode(Target[] exprs) {
-    super(NodeType.EXPRS);
+  public EvalExprNode(int pid, Target[] exprs) {
+    super(pid, NodeType.EXPRS);
     this.exprs = exprs;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
index fd3c0d0..0520214 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
 
 public class ExceptNode extends BinaryNode {
 
-  public ExceptNode() {
-    super(NodeType.EXCEPT);
-  }
-
-  public ExceptNode(LogicalNode outer, LogicalNode inner) {
-    this();
+  public ExceptNode(int pid, LogicalNode outer, LogicalNode inner) {
+    super(pid, NodeType.EXCEPT);
     setLeftChild(outer);
     setRightChild(inner);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 5c4e28e..4f9b6d7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -32,18 +32,13 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 	@Expose private EvalNode havingCondition = null;
 	@Expose private Target [] targets;
 	
-	public GroupbyNode() {
-		super();
-	}
-	
-	public GroupbyNode(final Column [] columns) {
-		super(NodeType.GROUP_BY);
+	public GroupbyNode(int pid, final Column [] columns) {
+		super(pid, NodeType.GROUP_BY);
 		this.columns = columns;
 	}
 	
-	public GroupbyNode(final Column [] columns, 
-	    final EvalNode havingCondition) {
-    this(columns);
+	public GroupbyNode(int pid, final Column [] columns, final EvalNode havingCondition) {
+    this(pid, columns);
     this.havingCondition = havingCondition;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
index 8ece090..7272630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
@@ -30,9 +30,9 @@ public class IndexScanNode extends ScanNode {
   @Expose private Schema keySchema = null;
   @Expose private Datum[] datum = null;
   
-  public IndexScanNode(ScanNode scanNode , 
+  public IndexScanNode(int pid, ScanNode scanNode ,
       Schema keySchema , Datum[] datum, SortSpec[] sortKeys ) {
-    super(scanNode.getTableDesc());
+    super(pid, scanNode.getTableDesc());
     setQual(scanNode.getQual());
     setInSchema(scanNode.getInSchema());
     setOutSchema(scanNode.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
index 0882e56..493698d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
@@ -24,13 +24,8 @@ package org.apache.tajo.engine.planner.logical;
 import org.apache.tajo.engine.planner.PlanString;
 
 public class IntersectNode extends BinaryNode {
-
-  public IntersectNode() {
-    super(NodeType.INTERSECT);
-  }
-
-  public IntersectNode(LogicalNode outer, LogicalNode inner) {
-    this();
+  public IntersectNode(int pid, LogicalNode outer, LogicalNode inner) {
+    super(pid, NodeType.INTERSECT);
     setLeftChild(outer);
     setRightChild(inner);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
index 45f40f9..58214ac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
@@ -32,14 +32,14 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable {
   @Expose private EvalNode joinQual;
   @Expose private Target[] targets;
 
-  public JoinNode(JoinType joinType, LogicalNode left) {
-    super(NodeType.JOIN);
+  public JoinNode(int pid, JoinType joinType, LogicalNode left) {
+    super(pid, NodeType.JOIN);
     this.joinType = joinType;
     setLeftChild(left);
   }
 
-  public JoinNode(JoinType joinType, LogicalNode left, LogicalNode right) {
-    super(NodeType.JOIN);
+  public JoinNode(int pid, JoinType joinType, LogicalNode left, LogicalNode right) {
+    super(pid, NodeType.JOIN);
     this.joinType = joinType;
     setLeftChild(left);
     setRightChild(right);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
index 697f1fc..2e1057d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -24,12 +24,8 @@ import org.apache.tajo.engine.planner.PlanString;
 public final class LimitNode extends UnaryNode implements Cloneable {
 	@Expose private long fetchFirstNum;
 
-	public LimitNode() {
-		super();
-	}
-
-  public LimitNode(long fetchFirstNum) {
-    super(NodeType.LIMIT);
+  public LimitNode(int pid, long fetchFirstNum) {
+    super(pid, NodeType.LIMIT);
     this.fetchFirstNum = fetchFirstNum;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
index 7d7e86d..042d9ef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
@@ -29,18 +29,21 @@ import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TUtil;
 
 public abstract class LogicalNode implements Cloneable, GsonObject {
+  @Expose private int pid;
   @Expose private NodeType type;
 	@Expose private Schema inputSchema;
 	@Expose	private Schema outputSchema;
 
 	@Expose	private double cost = 0;
-	
-	public LogicalNode() {
-	}
 
-	public LogicalNode(NodeType type) {
-		this.type = type;
+	public LogicalNode(int pid, NodeType type) {
+    this.pid = pid;
+    this.type = type;
 	}
+
+  public int getPID() {
+    return pid;
+  }
 	
 	public NodeType getType() {
 		return this.type;
@@ -79,7 +82,8 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 	  if (obj instanceof LogicalNode) {
 	    LogicalNode other = (LogicalNode) obj;
 
-      boolean eq = this.type == other.type;
+      boolean eq = this.pid == other.pid;
+      eq = this.type == other.type;
       eq = eq && TUtil.checkEquals(this.inputSchema, other.inputSchema);
       eq = eq && TUtil.checkEquals(this.outputSchema, other.outputSchema);
       eq = eq && this.cost == other.cost;
@@ -93,12 +97,10 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 	@Override
 	public Object clone() throws CloneNotSupportedException {
 	  LogicalNode node = (LogicalNode)super.clone();
+    node.pid = pid;
 	  node.type = type;
-	  node.inputSchema = 
-	      (Schema) (inputSchema != null ? inputSchema.clone() : null);
-	  node.outputSchema = 
-	      (Schema) (outputSchema != null ? outputSchema.clone() : null);
-	  
+	  node.inputSchema =  (Schema) (inputSchema != null ? inputSchema.clone() : null);
+	  node.outputSchema = (Schema) (outputSchema != null ? outputSchema.clone() : null);
 	  return node;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
index e5373db..9cad5af 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
@@ -21,8 +21,8 @@ package org.apache.tajo.engine.planner.logical;
 import org.apache.tajo.engine.planner.PlanString;
 
 public class LogicalRootNode extends UnaryNode implements Cloneable {
-  public LogicalRootNode() {
-    super(NodeType.ROOT);
+  public LogicalRootNode(int pid) {
+    super(pid, NodeType.ROOT);
   }
   
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 792fee8..44790ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -38,9 +38,7 @@ public enum NodeType {
   LIMIT(LimitNode.class),
   JOIN(JoinNode.class),
   PROJECTION(ProjectionNode.class),
-  RECEIVE(ReceiveNode.class),
   ROOT(LogicalRootNode.class),
-  SEND(SendNode.class),
   SCAN(ScanNode.class),
   SELECTION(SelectionNode.class),
   STORE(StoreTableNode.class),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
deleted file mode 100644
index 0355b21..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-public enum PipeType {
-  PULL,
-  PUSH
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
index 935f188..a5aa804 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
@@ -32,18 +32,10 @@ public class ProjectionNode extends UnaryNode implements Projectable {
   @Expose private boolean distinct = false;
 
   /**
-   * This method is for gson.
-   */
-  @SuppressWarnings("unused")
-	private ProjectionNode() {
-		super();
-	}
-
-  /**
    * @param targets they should be all evaluated ones.
    */
-	public ProjectionNode(Target [] targets) {		
-		super(NodeType.PROJECTION);
+	public ProjectionNode(int pid, Target [] targets) {
+		super(pid, NodeType.PROJECTION);
 		this.targets = targets;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
deleted file mode 100644
index a37ad5f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlanString;
-
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
-public final class ReceiveNode extends LogicalNode implements Cloneable {
-  @Expose private PipeType pipeType;
-  @Expose private RepartitionType repaType;
-  @Expose private Map<String, List<URI>> fetchMap;
-
-  private ReceiveNode() {
-    super(NodeType.RECEIVE);
-  }
-  public ReceiveNode(PipeType pipeType, RepartitionType shuffleType) {
-    this();
-    this.pipeType = pipeType;
-    this.repaType = shuffleType;
-    this.fetchMap = Maps.newHashMap();
-  }
-
-  public PipeType getPipeType() {
-    return this.pipeType;
-  }
-
-  public RepartitionType getRepartitionType() {
-    return this.repaType;
-  }
-  
-  public void addData(String name, URI uri) {
-    if (fetchMap.containsKey(name)) {
-      fetchMap.get(name).add(uri);
-    } else {
-      fetchMap.put(name, Lists.newArrayList(uri));
-    }
-  }
-  
-  public Collection<URI> getSrcURIs(String name) {
-    return Collections.unmodifiableList(fetchMap.get(name));
-  }
-
-  public Collection<Entry<String, List<URI>>> getAllDataSet() {
-    return Collections.unmodifiableSet(fetchMap.entrySet());
-  }
-
-
-  @Override
-  public PlanString getPlanString() {
-    return new PlanString("Receive ");
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof ReceiveNode) {
-      ReceiveNode other = (ReceiveNode) obj;
-      return pipeType == other.pipeType && repaType == other.repaType;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(pipeType, repaType);
-  }
-
-  @Override
-  public Object clone() throws CloneNotSupportedException {
-    ReceiveNode receive = (ReceiveNode) super.clone();
-    receive.pipeType = pipeType;
-    receive.repaType = repaType;
-    receive.fetchMap = Maps.newHashMap();
-    // Both String and URI are immutable, but a list is mutable.
-    for (Entry<String, List<URI>> entry : fetchMap.entrySet()) {
-      receive.fetchMap
-          .put(entry.getKey(), new ArrayList<URI>(entry.getValue()));
-    }
-
-    return receive;
-  }
-
-  @Override
-  public String toString() {
-    Gson gson = CoreGsonHelper.getPrettyInstance();
-    return gson.toJson(this);
-  }
-
-  @Override
-  public void preOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);
-  }
-  
-  @Override
-  public void postOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 583e472..6a30aa6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Schema;
 
 public abstract class RelationNode extends LogicalNode {
 
-  public RelationNode(NodeType nodeType) {
-    super(nodeType);
+  public RelationNode(int pid, NodeType nodeType) {
+    super(pid, nodeType);
     assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
deleted file mode 100644
index d317fb8..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-public enum RepartitionType {
-  NONE,
-  HASH,
-  SORT
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 3596e2d..04c7b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -34,20 +34,16 @@ public class ScanNode extends RelationNode implements Projectable {
   @Expose private Schema renamedSchema;
 	@Expose private EvalNode qual;
 	@Expose private Target[] targets;
-	
-	public ScanNode() {
-		super(NodeType.SCAN);
-	}
 
-  public ScanNode(TableDesc desc) {
-    super(NodeType.SCAN);
+  public ScanNode(int pid, TableDesc desc) {
+    super(pid, NodeType.SCAN);
     this.tableDesc = desc;
     this.setInSchema(tableDesc.getSchema());
     this.setOutSchema(tableDesc.getSchema());
   }
   
-	public ScanNode(TableDesc desc, String alias) {
-    this(desc);
+	public ScanNode(int pid, TableDesc desc, String alias) {
+    this(pid, desc);
     this.alias = PlannerUtil.normalizeTableName(alias);
     renamedSchema = getOutSchema();
     renamedSchema.setQualifier(this.alias, true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index 2435453..a12fc06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
 public class SelectionNode extends UnaryNode implements Cloneable {
 	@Expose private EvalNode qual;
 	
-	public SelectionNode() {
-		super();
-	}
-	
-	public SelectionNode(EvalNode qual) {
-		super(NodeType.SELECTION);
+	public SelectionNode(int pid, EvalNode qual) {
+		super(pid, NodeType.SELECTION);
 		setQual(qual);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
deleted file mode 100644
index 96e6fc5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlanString;
-import org.apache.tajo.util.TUtil;
-
-import java.net.URI;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This logical node means that the worker sends intermediate data to 
- * some destined one or more workers.
- */
-public class SendNode extends UnaryNode {
-  @Expose private PipeType pipeType;
-  @Expose private RepartitionType repaType;
-  /** This will be used for pipeType == PUSH. */
-  @Expose private Map<Integer, URI> destURIs;
-  @Expose private Column[] partitionKeys;
-  @Expose private int numPartitions;
-
-  private SendNode() {
-    super(NodeType.SEND);
-  }
-  
-  public SendNode(PipeType pipeType, RepartitionType repaType) {
-    this();
-    this.pipeType = pipeType;
-    this.repaType = repaType;
-    this.destURIs = Maps.newHashMap();
-  }
-
-  public PipeType getPipeType() {
-    return this.pipeType;
-  }
-  
-  public RepartitionType getRepartitionType() {
-    return this.repaType;
-  }
-  
-  public URI getDestURI(int partition) {
-    return this.destURIs.get(partition);
-  }
-  
-  public void setPartitionKeys(Column [] keys, int numPartitions) {
-    Preconditions.checkState(repaType != RepartitionType.NONE,
-        "Hash or Sort repartition only requires the partition keys");
-    Preconditions.checkArgument(keys.length > 0, 
-        "At least one partition key must be specified.");
-    Preconditions.checkArgument(numPartitions > 0,
-        "The number of partitions must be positive: %s", numPartitions);
-    this.partitionKeys = keys;
-    this.numPartitions = numPartitions;
-  }
-  
-  public boolean hasPartitionKeys() {
-    return this.partitionKeys != null;
-  }
-  
-  public Column [] getPartitionKeys() {
-    return this.partitionKeys;
-  }
-  
-  public int getPartitionsNum() {
-    return this.numPartitions;
-  }
-  
-  public Iterator<Entry<Integer, URI>> getAllDestURIs() {
-    return this.destURIs.entrySet().iterator();
-  }
-  
-  public void putDestURI(int partition, URI uri) {
-    this.destURIs.put(partition, uri);
-  }
-  
-  public void setDestURIs(Map<Integer, URI> destURIs) {
-    this.destURIs = destURIs;
-  }
-
-  @Override
-  public PlanString getPlanString() {
-    return null;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof SendNode) {
-      SendNode other = (SendNode) obj;
-      return pipeType == other.pipeType
-          && repaType == other.repaType
-          && TUtil.checkEquals(destURIs, other.destURIs);
-    } else {
-      return false;
-    }
-  }
-  
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(pipeType, repaType, destURIs);
-  }
-  
-  @Override
-  public Object clone() throws CloneNotSupportedException {
-    SendNode send = (SendNode) super.clone();
-    send.pipeType = pipeType;
-    send.repaType = repaType;
-    send.destURIs = Maps.newHashMap();
-    for (Entry<Integer, URI> entry : destURIs.entrySet()) {
-      send.destURIs.put(entry.getKey(), entry.getValue());
-    }
-    
-    return send;
-  }
-
-  @Override
-  public String toString() {
-    Gson gson = CoreGsonHelper.getPrettyInstance();
-    return gson.toJson(this);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
index e754a7f..c3a457e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -26,22 +26,17 @@ import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 public final class SortNode extends UnaryNode implements Cloneable {
-	@Expose
-  private SortSpec [] sortKeys;
-	
-	public SortNode() {
-		super();
-	}
+	@Expose private SortSpec [] sortKeys;
 
-  public SortNode(SortSpec[] sortKeys) {
-    super(NodeType.SORT);
+  public SortNode(int pid, SortSpec[] sortKeys) {
+    super(pid, NodeType.SORT);
     Preconditions.checkArgument(sortKeys.length > 0, 
         "At least one sort key must be specified");
     this.sortKeys = sortKeys;
   }
 
-  public SortNode(SortSpec[] sortKeys, Schema inSchema, Schema outSchema) {
-    this(sortKeys);
+  public SortNode(int pid, SortSpec[] sortKeys, Schema inSchema, Schema outSchema) {
+    this(pid, sortKeys);
     this.setInSchema(inSchema);
     this.setOutSchema(outSchema);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
index 3c44374..c3d7307 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.logical;
 
 public class StoreIndexNode extends StoreTableNode {
 
-  public StoreIndexNode(String tableName) {
-    super(tableName);
+  public StoreIndexNode(int pid, String tableName) {
+    super(pid, tableName);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index a2dd097..6872316 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -39,8 +39,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
 
-  public StoreTableNode(String tableName) {
-    super(NodeType.STORE);
+  public StoreTableNode(int pid, String tableName) {
+    super(pid, NodeType.STORE);
     this.tableName = tableName;
   }
 
@@ -48,10 +48,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.tableName;
   }
 
-  public final void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
   public void setStorageType(StoreType storageType) {
     this.storageType = storageType;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index e246466..39ae1e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -30,8 +30,8 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
   @Expose private LogicalNode subQuery;
   @Expose private Target [] targets; // unused
 
-  public TableSubQueryNode(String tableName, LogicalNode subQuery) {
-    super(NodeType.TABLE_SUBQUERY);
+  public TableSubQueryNode(int pid, String tableName, LogicalNode subQuery) {
+    super(pid, NodeType.TABLE_SUBQUERY);
     this.tableName = PlannerUtil.normalizeTableName(tableName);
     this.subQuery = subQuery;
     setOutSchema((Schema) this.subQuery.getOutSchema().clone());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 7f6e065..2e091be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -24,15 +24,11 @@ import com.google.gson.annotations.Expose;
 public abstract class UnaryNode extends LogicalNode implements Cloneable {
 	@Expose LogicalNode child;
 	
-	public UnaryNode() {
-		super();
-	}
-	
 	/**
 	 * @param type
 	 */
-	public UnaryNode(NodeType type) {
-		super(type);
+	public UnaryNode(int pid, NodeType type) {
+		super(pid, type);
 	}
 	
 	public void setChild(LogicalNode subNode) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
index a62e91b..d0e8b02 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
 
 public class UnionNode extends BinaryNode {
 
-  public UnionNode() {
-    super(NodeType.UNION);
-  }
-
-  public UnionNode(LogicalNode outer, LogicalNode inner) {
-    this();
+  public UnionNode(int pid, LogicalNode outer, LogicalNode inner) {
+    super(pid, NodeType.UNION);
     setLeftChild(outer);
     setRightChild(inner);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index c8e3d91..7e60097 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -373,7 +373,7 @@ public class GlobalEngine extends AbstractService {
         queryContext.setFileOutput();
       }
 
-      storeNode = new StoreTableNode(outputTableName);
+      storeNode = new StoreTableNode(plan.newPID(), outputTableName);
       queryContext.setOutputPath(outputPath);
 
       if (insertNode.isOverwrite()) {
@@ -416,7 +416,7 @@ public class GlobalEngine extends AbstractService {
         }
 
 
-        ProjectionNode projectionNode = new ProjectionNode(targets);
+        ProjectionNode projectionNode = new ProjectionNode(plan.newPID(), targets);
         projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
         projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
         List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 48f45f6..620e813 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -104,12 +104,12 @@ public class GlobalPlanner {
     LOG.info(masterPlan);
   }
 
-  public static ScanNode buildInputExecutor(DataChannel channel) {
+  public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
         "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
     TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
     TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
-    return new ScanNode(desc);
+    return new ScanNode(plan.newPID(), desc);
   }
 
   public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext> {
@@ -183,7 +183,7 @@ public class GlobalPlanner {
               execBlock.setPlan(g1);
               dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
 
-              ScanNode scanNode = buildInputExecutor(dataChannel);
+              ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
               secondGroupBy.setChild(scanNode);
               masterPlan.addConnect(dataChannel);
             }
@@ -194,7 +194,7 @@ public class GlobalPlanner {
               execBlock.setPlan(g1);
               dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
 
-              ScanNode scanNode = buildInputExecutor(dataChannel);
+              ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
               secondGroupBy.setChild(scanNode);
               masterPlan.addConnect(dataChannel);
             }
@@ -224,7 +224,7 @@ public class GlobalPlanner {
           channel.setSchema(firstGroupBy.getOutSchema());
 
           GroupbyNode secondGroupBy = groupByNode;
-          ScanNode scanNode = buildInputExecutor(channel);
+          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
           secondGroupBy.setChild(scanNode);
 
           LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
@@ -248,7 +248,7 @@ public class GlobalPlanner {
         channel.setSchema(childNode.getOutSchema());
 
         SortNode secondSort = PlannerUtil.clone(lastDistNode);
-        ScanNode secondScan = buildInputExecutor(channel);
+        ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
         secondSort.setChild(secondScan);
 
         LimitNode limitAndSort;
@@ -296,8 +296,8 @@ public class GlobalPlanner {
         DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
         DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
 
-        ScanNode leftScan = buildInputExecutor(leftChannel);
-        ScanNode rightScan = buildInputExecutor(rightChannel);
+        ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+        ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
 
         joinNode.setLeftChild(leftScan);
         joinNode.setRightChild(rightScan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
deleted file mode 100644
index 899925c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-
-public class GlobalPlannerUtils {
-  private static Log LOG = LogFactory.getLog(GlobalPlannerUtils.class);
-
-  public static ScanNode newScanPlan(Schema inputSchema,
-                                     String inputTableId,
-                                     Path inputPath) {
-    TableMeta meta = CatalogUtil.newTableMeta(inputSchema, StoreType.CSV);
-    TableDesc desc = CatalogUtil.newTableDesc(inputTableId, meta, inputPath);
-    ScanNode newScan = new ScanNode(desc);
-    newScan.setInSchema(desc.getMeta().getSchema());
-    newScan.setOutSchema(desc.getMeta().getSchema());
-    return newScan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
index 59ba129..62bcf2d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.logical.IndexScanNode;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.Fragment;
@@ -55,7 +56,7 @@ public class IndexUtil {
     return builder.toString();
   }
   
-  public static IndexScanNode indexEval( ScanNode scanNode, 
+  public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
       Iterator<Entry<String, String>> iter ) {
    
     EvalNode qual = scanNode.getQual();
@@ -112,7 +113,7 @@ public class IndexUtil {
         datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
       }
       
-      return new IndexScanNode(scanNode, keySchema , datum , maxIndex);
+      return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
deleted file mode 100644
index cbfabf8..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestReceiveNode {
-  @Test
-  public final void testReceiveNode() throws CloneNotSupportedException {
-    ReceiveNode rec = new ReceiveNode(PipeType.PULL, RepartitionType.HASH);
-    
-    URI uri1 = URI.create("http://192.168.0.1:2190/?part=0");
-    URI uri2 = URI.create("http://192.168.0.2:2190/?part=1");
-    URI uri3 = URI.create("http://192.168.0.3:2190/?part=2");
-    URI uri4 = URI.create("http://192.168.0.4:2190/?part=3");
-    List<URI> set1 = Lists.newArrayList(uri1, uri2);
-    List<URI> set2 = Lists.newArrayList(uri3, uri4);
-    
-    rec.addData("test1", set1.get(0));
-    rec.addData("test1", set1.get(1));
-    rec.addData("test2", set2.get(0));
-    rec.addData("test2", set2.get(1));
-    
-    assertEquals(NodeType.RECEIVE, rec.getType());
-    assertEquals(PipeType.PULL, rec.getPipeType());
-    assertEquals(RepartitionType.HASH, rec.getRepartitionType());    
-    assertEquals(set1, Lists.newArrayList(rec.getSrcURIs("test1")));
-    assertEquals(set2, Lists.newArrayList(rec.getSrcURIs("test2")));
-    
-    ReceiveNode rec2 = (ReceiveNode) rec.clone();
-    assertEquals(NodeType.RECEIVE, rec2.getType());
-    assertEquals(PipeType.PULL, rec2.getPipeType());
-    assertEquals(RepartitionType.HASH, rec2.getRepartitionType());    
-    assertEquals(set1, Lists.newArrayList(rec2.getSrcURIs("test1")));
-    assertEquals(set2, Lists.newArrayList(rec2.getSrcURIs("test2")));
-    
-    assertEquals(rec, rec2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
deleted file mode 100644
index 66197de..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-import org.junit.Test;
-
-import java.net.URI;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestSendNode {
-  @Test
-  public final void testSendNode() throws CloneNotSupportedException {
-    SendNode send = new SendNode(PipeType.PULL, RepartitionType.HASH);
-    send.putDestURI(0, URI.create("http://localhost:2190"));
-    send.putDestURI(1, URI.create("http://localhost:2191"));
-    
-    assertEquals(NodeType.SEND, send.getType());
-    assertEquals(PipeType.PULL, send.getPipeType());
-    assertEquals(RepartitionType.HASH, send.getRepartitionType());
-    assertEquals(URI.create("http://localhost:2190"), send.getDestURI(0));
-    assertEquals(URI.create("http://localhost:2191"), send.getDestURI(1));
-    
-    SendNode send2 = (SendNode) send.clone();
-    assertEquals(NodeType.SEND, send2.getType());
-    assertEquals(PipeType.PULL, send2.getPipeType());
-    assertEquals(RepartitionType.HASH, send2.getRepartitionType());
-    assertEquals(URI.create("http://localhost:2190"), send2.getDestURI(0));
-    assertEquals(URI.create("http://localhost:2191"), send2.getDestURI(1));
-    
-    assertEquals(send, send2);
-  }
-}