You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:44 UTC

[47/50] [abbrv] git commit: DAG-execplan

DAG-execplan


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bc50ff78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bc50ff78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bc50ff78

Branch: refs/heads/DAG-execplan
Commit: bc50ff789c6b5c2b17fa73a02a40a3717121eede
Parents: 4537700
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Dec 27 19:30:56 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Dec 27 19:30:56 2013 +0900

----------------------------------------------------------------------
 .../engine/planner/PhysicalPlannerImpl.java     |   9 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  15 +-
 .../tajo/engine/planner/global/DataChannel.java |  13 +-
 .../engine/planner/global/ExecutionPlan.java    | 110 ++++-----
 .../engine/planner/global/GlobalPlanner.java    | 243 ++++++++-----------
 .../tajo/engine/planner/global/MasterPlan.java  |   3 +-
 .../planner/graph/SimpleDirectedGraph.java      |   5 +-
 .../tajo/master/querymaster/SubQuery.java       |   3 +-
 .../planner/physical/TestPhysicalPlanner.java   |   6 +-
 9 files changed, 178 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 19a6574..d1f34c8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -128,7 +128,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     LogicalNode root = plan.getTerminalNode();
     List<DataChannel> channels = context.getOutgoingChannels();
     for (DataChannel channel : channels) {
-      LogicalNode node = plan.getTopNodeFromPID(channel.getSrcPID());
+      LogicalNode node = plan.getPlanGroupWithPID(channel.getSrcPID()).getRootNode();
       if (node.getType() != NodeType.STORE) {
         StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
         storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
@@ -140,9 +140,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           storeTableNode.setDefaultParition();
         }
 
-        plan.remove(node, root);
-        plan.add(node, storeTableNode, EdgeType.SINGLE);
-        plan.add(storeTableNode, root, EdgeType.SINGLE);
+        LogicalNode topNode = plan.getFirstPlanGroup().toLinkedLogicalNode();
+        storeTableNode.setChild(topNode);
+        plan.setPlan(storeTableNode);
+
         channel.updateSrcPID(storeTableNode.getPID());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index e9bb071..5164b65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -492,13 +492,18 @@ public class PlannerUtil {
     Preconditions.checkNotNull(executionPlan);
     Preconditions.checkNotNull(type);
 
-    LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
-    finder.find();
-
-    if (finder.getFoundNodes().size() == 0) {
+    if (executionPlan.hasPlanGroup()) {
+      return findTopNode(executionPlan.getFirstPlanGroup().toLinkedLogicalNode(), type);
+    } else {
       return null;
     }
-    return (T) finder.getFoundNodes().get(0);
+//    LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
+//    finder.find();
+//
+//    if (finder.getFoundNodes().size() == 0) {
+//      return null;
+//    }
+//    return (T) finder.getFoundNodes().get(0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index a8eb1c3..98ee8c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.planner.global.ExecutionPlan.LogicalNodeGroup;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.PlanGroup;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -49,9 +49,8 @@ public class DataChannel {
     this.partitionType = partitionType;
   }
 
-  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum, Schema schema) {
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
     this(src.getId(), target.getId(), partitionType, partNum);
-    setSchema(schema);
   }
 
   public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
@@ -235,10 +234,12 @@ public class DataChannel {
     return targetId.getPid();
   }
 
-  public static DataChannel linkChannelAndLogicalNodeGroups(LogicalNodeGroup src, LogicalNodeGroup target,
-                                                            DataChannel channel) {
+  public static DataChannel linkChannelAndPlanGroups(PlanGroup src, PlanGroup target,
+                                                     DataChannel channel) {
     channel.srcId.setPID(src.getId());
-    channel.targetId.setPID(target.getId());
+    if (target != null) {
+      channel.targetId.setPID(target.getId());
+    }
     return channel;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index 614d128..a3506d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -45,18 +45,18 @@ public class ExecutionPlan implements GsonObject {
   @Expose private boolean hasUnionPlan;
   @Expose private boolean hasJoinPlan;
   @Expose private LogicalRootNode terminalNode;
-  @Expose private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
-  @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
+  private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+  private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
       = new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
 
-  private NavigableMap<Integer, LogicalNodeGroup> logicalNodeGroups = Maps.newTreeMap();
+  @Expose private NavigableMap<Integer, PlanGroup> planGroups = Maps.newTreeMap();
   private boolean built = false;
 
-  public static class LogicalNodeGroup {
-    private int rootPID;
-    private List<LogicalNode> nodes = Lists.newArrayList();  // order: root -> leaf
+  public static class PlanGroup {
+    @Expose private int rootPID;
+    @Expose private List<LogicalNode> nodes = Lists.newArrayList();  // order: root -> leaf
 
-    public LogicalNodeGroup(int rootPID) {
+    public PlanGroup(int rootPID) {
       setId(rootPID);
     }
 
@@ -71,12 +71,12 @@ public class ExecutionPlan implements GsonObject {
     public void addNodeAndDescendants(LogicalNode logicalNode) {
       add(logicalNode);
       if (logicalNode instanceof UnaryNode) {
-        add(((UnaryNode) logicalNode).getChild());
+        addNodeAndDescendants(((UnaryNode) logicalNode).getChild());
       } else if (logicalNode instanceof BinaryNode) {
-        add(((BinaryNode) logicalNode).getLeftChild());
-        add(((BinaryNode) logicalNode).getRightChild());
+        addNodeAndDescendants(((BinaryNode) logicalNode).getLeftChild());
+        addNodeAndDescendants(((BinaryNode) logicalNode).getRightChild());
       } else if (logicalNode instanceof TableSubQueryNode) {
-        add(((TableSubQueryNode) logicalNode).getSubQuery());
+        addNodeAndDescendants(((TableSubQueryNode) logicalNode).getSubQuery());
       }
     }
 
@@ -87,7 +87,7 @@ public class ExecutionPlan implements GsonObject {
     public LogicalNode toLinkedLogicalNode() {
       LogicalNode[] nodes = this.nodes.toArray(new LogicalNode[this.nodes.size()]);
 
-      for (int i = 0; i < nodes.length; i++) {
+      for (int i = 0; i < nodes.length;) {
         if (nodes[i] instanceof UnaryNode) {
           ((UnaryNode)nodes[i]).setChild(nodes[++i]);
         } else if (nodes[i] instanceof BinaryNode) {
@@ -95,6 +95,8 @@ public class ExecutionPlan implements GsonObject {
           ((BinaryNode)nodes[i]).setRightChild(nodes[++i]);
         } else if (nodes[i] instanceof TableSubQueryNode) {
           ((TableSubQueryNode)nodes[i]).setSubQuery(nodes[++i]);
+        } else {
+          i++;
         }
       }
       return nodes[0];
@@ -131,10 +133,7 @@ public class ExecutionPlan implements GsonObject {
     for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
       graph.removeEdge(edge.getChildId(), edge.getParentId());
     }
-    for (LogicalNodeGroup eachGroup : logicalNodeGroups.values()) {
-      eachGroup.clear();
-    }
-    logicalNodeGroups.clear();
+    planGroups.clear();
     vertices.clear();
     this.inputContext = null;
     this.hasUnionPlan = false;
@@ -152,17 +151,33 @@ public class ExecutionPlan implements GsonObject {
     }
 
     // add group
-    LogicalNodeGroup nodeGroup = new LogicalNodeGroup(topNode.getPID());
+    PlanGroup nodeGroup = new PlanGroup(topNode.getPID());
     nodeGroup.addNodeAndDescendants(topNode);
-    logicalNodeGroups.put(nodeGroup.rootPID, nodeGroup);
+    planGroups.put(nodeGroup.rootPID, nodeGroup);
+  }
+
+  public PlanGroup remoteLogicalNodeGroup(int pid) {
+    // TODO: improve the code
+    PlanGroup removed = planGroups.remove(pid);
+    Collection<PlanGroup> remain = planGroups.values();
+    clear();
+    for (PlanGroup planGroup : remain) {
+      addPlan(planGroup.toLinkedLogicalNode());
+    }
+    build();
+    return removed;
+  }
+
+  public PlanGroup getPlanGroupWithPID(int pid) {
+    return planGroups.get(pid);
   }
 
-  public LogicalNodeGroup getLogicalNodeGroupWithPID(int pid) {
-    return logicalNodeGroups.get(pid);
+  public PlanGroup getFirstPlanGroup() {
+    return planGroups.firstEntry().getValue();
   }
 
-  public LogicalNodeGroup getFirstLogicalNodeGroup() {
-    return logicalNodeGroups.firstEntry().getValue();
+  public boolean hasPlanGroup() {
+    return planGroups.size() > 0;
   }
 
   public void build() {
@@ -173,8 +188,8 @@ public class ExecutionPlan implements GsonObject {
 
     ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
 
-    for (LogicalNodeGroup logicalNodeGroup : logicalNodeGroups.values()) {
-      LogicalNode topNode = logicalNodeGroup.nodes.iterator().next();
+    for (PlanGroup planGroup : planGroups.values()) {
+      LogicalNode topNode = planGroup.nodes.iterator().next();
       builder.visit(topNode);
       this.add(topNode, terminalNode, EdgeType.SINGLE);
     }
@@ -238,6 +253,10 @@ public class ExecutionPlan implements GsonObject {
     return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
   }
 
+  public Schema getOutSchemaWithPID(int pid) {
+    return planGroups.get(pid).getRootNode().getOutSchema();
+  }
+
   @Override
   public boolean equals(Object o) {
     if (o instanceof ExecutionPlan) {
@@ -266,12 +285,11 @@ public class ExecutionPlan implements GsonObject {
   }
 
   public LogicalNode getTopNodeFromPID(int pid) {
-    for (Integer childId : graph.getChilds(terminalNode.getPID())) {
-      if (childId == pid) {
-        return vertices.get(childId);
-      }
+    if (planGroups.containsKey(pid)) {
+      return planGroups.get(pid).getRootNode();
+    } else {
+      return null;
     }
-    return null;
   }
 
   public int getChildCount(LogicalNode node) {
@@ -318,33 +336,15 @@ public class ExecutionPlan implements GsonObject {
     @Expose private final boolean hasUnionPlan;
     @Expose private final InputContext inputContext;
     @Expose private final LogicalRootNode terminalNode;
-    @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
-    @Expose Map<Integer, List<PIDAndEdgeType>> adjacentList = new HashMap<Integer, List<PIDAndEdgeType>>();
+    @Expose private final Map<Integer, PlanGroup> planGroups;
 
     public ExecutionPlanJsonHelper(ExecutionPlan plan) {
       this.pidFactory = plan.pidFactory;
       this.hasJoinPlan = plan.hasJoinPlan;
       this.hasUnionPlan = plan.hasUnionPlan;
-      this.inputContext = plan.getInputContext();
+      this.inputContext = plan.inputContext;
       this.terminalNode = plan.terminalNode;
-      this.vertices.putAll(plan.vertices);
-      Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
-      int parentId, childId;
-      List<PIDAndEdgeType> adjacents;
-
-      // convert the graph to an adjacent list
-      for (ExecutionPlanEdge edge : edges) {
-        childId = edge.getChildId();
-        parentId = edge.getParentId();
-
-        if (adjacentList.containsKey(childId)) {
-          adjacents = adjacentList.get(childId);
-        } else {
-          adjacents = new ArrayList<PIDAndEdgeType>();
-          adjacentList.put(childId, adjacents);
-        }
-        adjacents.add(new PIDAndEdgeType(parentId, edge.getEdgeType()));
-      }
+      this.planGroups = plan.planGroups;
     }
 
     @Override
@@ -358,14 +358,10 @@ public class ExecutionPlan implements GsonObject {
       plan.hasJoinPlan = this.hasJoinPlan;
       plan.hasUnionPlan = this.hasUnionPlan;
       plan.setInputContext(this.inputContext);
-      plan.vertices.putAll(this.vertices);
-
-      for (Entry<Integer, List<PIDAndEdgeType>> e : this.adjacentList.entrySet()) {
-        LogicalNode child = this.vertices.get(e.getKey());
-        for (PIDAndEdgeType pidAndEdgeType : e.getValue()) {
-          plan.add(child, this.vertices.get(pidAndEdgeType.id), pidAndEdgeType.edgeType);
-        }
+      for (PlanGroup planGroup : planGroups.values()) {
+        plan.addPlan(planGroup.toLinkedLogicalNode());
       }
+      plan.build();
 
       return plan;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index efa4259..912f367 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -25,25 +25,22 @@ import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.PlanGroup;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.NONE_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
 
 /**
  * Build DAG
@@ -84,11 +81,14 @@ public class GlobalPlanner {
     ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
 
     if (childExecBlock.getPlan() != null) {
-      childExecBlock.getPlan().build();
       ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
-      DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, lastNode.getPID(), -1, NONE_PARTITION, 1,
-          lastNode.getOutSchema());
+      DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
       dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
+      dataChannel.setSchema(lastNode.getOutSchema());
+      if (childExecBlock.getPlan().hasPlanGroup()) {
+        DataChannel.linkChannelAndPlanGroups(childExecBlock.getPlan().getFirstPlanGroup(),
+            null, dataChannel);
+      }
       masterPlan.addConnect(dataChannel);
       masterPlan.setTerminal(terminalBlock);
     } else {
@@ -98,14 +98,6 @@ public class GlobalPlanner {
     LOG.info(masterPlan);
   }
 
-  public static ScanNode buildInputExecutor(LogicalPlan plan, Schema schema, ExecutionBlockId srcId, ExecutionBlockId targetId, StoreType storeType) {
-    Preconditions.checkArgument(schema != null,
-        "Channel schema (" + srcId +" -> "+ targetId + ") is not initialized");
-    TableMeta meta = new TableMeta(storeType, new Options());
-    TableDesc desc = new TableDesc(srcId.toString(), schema, meta, new Path("/"));
-    return new ScanNode(plan.newPID(), desc);
-  }
-
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
         "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -116,31 +108,12 @@ public class GlobalPlanner {
 
   private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
                                                 ExecutionBlock parent, JoinNode join, boolean leftTable) {
-    ExecutionBlock childBlock;
-    int targetPid;
-    if (leftTable) {
-      childBlock = leftBlock;
-      if (!childBlock.getPlan().isBuilt()) {
-        childBlock.getPlan().build();
-      }
-      targetPid = parent.getInputContext().getScanNodes()[0].getPID();
-    } else {
-      childBlock = rightBlock;
-      if (!childBlock.getPlan().isBuilt()) {
-        childBlock.getPlan().build();
-      }
-      targetPid = parent.getInputContext().getScanNodes()[1].getPID();
-    }
-    LogicalNode srcTopNode = childBlock.getPlan().getTopNode(0);
-    int srcPid = srcTopNode.getPID();
-    if (!parent.getPlan().isBuilt()) {
-      parent.getPlan().build();
-    }
+    ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
 
-    DataChannel channel = new DataChannel(childBlock, parent, srcPid, targetPid, HASH_PARTITION, 32,
-        srcTopNode.getOutSchema());
+    DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
     channel.setStoreType(storeType);
     if (join.getJoinType() != JoinType.CROSS) {
+      // Each block should have the only one output schema
       Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
           leftBlock.getPlan().getOutSchema(0), rightBlock.getPlan().getOutSchema(0));
       if (leftTable) {
@@ -197,22 +170,21 @@ public class GlobalPlanner {
 
     // symmetric repartition join
     currentBlock = masterPlan.newExecutionBlock();
-    leftBlock.getPlan().build();
-    rightBlock.getPlan().build();
-    LogicalNode leftTopNode = leftBlock.getPlan().getTopNode(0);
-    LogicalNode rightTopNode = rightBlock.getPlan().getTopNode(0);
 
-    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftTopNode.getOutSchema(),
-        leftBlock.getId(), currentBlock.getId(), storeType);
-    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightTopNode.getOutSchema(),
-        rightBlock.getId(), currentBlock.getId(), storeType);
+    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
+    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
 
     joinNode.setLeftChild(leftScan);
     joinNode.setRightChild(rightScan);
     currentBlock.setPlan(joinNode);
 
-    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
-    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+    DataChannel.linkChannelAndPlanGroups(leftBlock.getPlan().getFirstPlanGroup(),
+        currentBlock.getPlan().getFirstPlanGroup(), leftChannel);
+    DataChannel.linkChannelAndPlanGroups(rightBlock.getPlan().getFirstPlanGroup(),
+        currentBlock.getPlan().getFirstPlanGroup(), rightChannel);
 
     masterPlan.addConnect(leftChannel);
     masterPlan.addConnect(rightChannel);
@@ -254,21 +226,21 @@ public class GlobalPlanner {
     SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
     currentBlock.getEnforcer().enforceSortAggregation(groupbyNode.getPID(), sortSpecs);
 
-    // setup current block with channel
-    ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), topMostOfFirstPhase.getOutSchema(),
-        childBlock.getId(), currentBlock.getId(), storeType);
-    groupbyNode.setChild(scanNode);
-    currentBlock.setPlan(groupbyNode);
 
     // setup channel
-    childBlock.getPlan().build();
-    currentBlock.getPlan().build();
     DataChannel channel;
-    channel = new DataChannel(childBlock, currentBlock, topMostOfFirstPhase.getPID(), scanNode.getPID(), HASH_PARTITION,
-        32, topMostOfFirstPhase.getOutSchema());
+    channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
     channel.setPartitionKey(groupbyNode.getGroupingColumns());
+    channel.setSchema(topMostOfFirstPhase.getOutSchema());
     channel.setStoreType(storeType);
 
+    // setup current block with channel
+    ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
+    groupbyNode.setChild(scanNode);
+    currentBlock.setPlan(groupbyNode);
+
+    DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+        currentBlock.getPlan().getFirstPlanGroup(), channel);
     context.plan.addConnect(channel);
 
     return currentBlock;
@@ -300,10 +272,8 @@ public class GlobalPlanner {
           dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
 
           ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
-          ExecutionPlan subBlockPlan = subBlock.getPlan();
-          LogicalNode topNodeOfSubBlock = subBlockPlan.getLogicalNodeGroupWithPID(dataChannel.getSrcPID())
-              .toLinkedLogicalNode();
           GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), firstPhaseGroupBy);
+          LogicalNode topNodeOfSubBlock = subBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
           g1.setChild(topNodeOfSubBlock);
           subBlock.setPlan(g1);
 
@@ -316,28 +286,24 @@ public class GlobalPlanner {
         childBlock.setPlan(firstPhaseGroupBy);
         currentBlock = masterPlan.newExecutionBlock();
 
-        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), firstPhaseGroupBy.getOutSchema(),
-            childBlock.getId(), currentBlock.getId(), storeType);
-        groupbyNode.setChild(scanNode);
-        groupbyNode.setInSchema(scanNode.getOutSchema());
-        currentBlock.setPlan(groupbyNode);
-
         DataChannel channel;
-        childBlock.getPlan().build();
-        currentBlock.getPlan().build();
-        int srcPID = childBlock.getPlan().getTopNodePid(0);
-        int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
-        int partitionNum;
         if (firstPhaseGroupBy.isEmptyGrouping()) {
-          partitionNum = 1;
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         } else {
-          partitionNum = 32;
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         }
-        channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, partitionNum,
-            firstPhaseGroupBy.getOutSchema());
-        channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+        channel.setSchema(firstPhaseGroupBy.getOutSchema());
         channel.setStoreType(storeType);
 
+        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+        groupbyNode.setChild(scanNode);
+        groupbyNode.setInSchema(scanNode.getOutSchema());
+        currentBlock.setPlan(groupbyNode);
+
+        DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+            currentBlock.getPlan().getFirstPlanGroup(), channel);
         masterPlan.addConnect(channel);
       }
     }
@@ -350,7 +316,7 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock;
 
     SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
-    LogicalNode childBlockPlan = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
+    LogicalNode childBlockPlan = childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
     firstSortNode.setChild(childBlockPlan);
     // sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
     firstSortNode.setInSchema(childBlockPlan.getOutSchema());
@@ -358,24 +324,18 @@ public class GlobalPlanner {
     childBlock.setPlan(firstSortNode);
 
     currentBlock = masterPlan.newExecutionBlock();
-    childBlock.getPlan().build();
-    int srcPID = childBlock.getPlan().getTopNodePid(0);
-    int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
+    channel.setSchema(firstSortNode.getOutSchema());
+    channel.setStoreType(storeType);
 
-    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), firstSortNode.getOutSchema(),
-        childBlock.getId(), currentBlock.getId(), storeType);
+    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
     currentNode.setChild(secondScan);
     currentNode.setInSchema(secondScan.getOutSchema());
     currentBlock.setPlan(currentNode);
 
-    childBlock.getPlan().build();
-    currentBlock.getPlan().build();
-
-    DataChannel channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32,
-        firstSortNode.getOutSchema());
-    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
-//    channel.setSchema(firstSortNode.getOutSchema());
-    channel.setStoreType(storeType);
+    DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+        currentBlock.getPlan().getFirstPlanGroup(), channel);
     masterPlan.addConnect(channel);
 
     return currentBlock;
@@ -389,9 +349,8 @@ public class GlobalPlanner {
 
     // if result table is not a partitioned table, directly store it
     if(partitionDesc == null) {
-      LogicalNode topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
-      currentNode.setChild(topNodeOfBlock);
-      currentNode.setInSchema(topNodeOfBlock.getOutSchema());
+      currentNode.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+      currentNode.setInSchema(childBlock.getPlan().getOutSchema(0));
       childBlock.setPlan(currentNode);
       return childBlock;
     }
@@ -402,35 +361,23 @@ public class GlobalPlanner {
     LogicalNode childNode = currentNode.getChild();
     childBlock.setPlan(childNode);
 
-    // 2. create a ScanNode for scanning shuffle data
-    //    StoreTableNode as the root node of the new execution block
+    // 2. create a new execution block, pipeline 2 exec blocks through a DataChannel
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
-    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), childNode.getOutSchema(),
-        childBlock.getId(), currentBlock.getId(), storeType);
-    currentNode.setChild(scanNode);
-    currentNode.setInSchema(scanNode.getOutSchema());
-    currentBlock.setPlan(currentNode);
-
-    // 3. pipeline 2 exec blocks through a DataChannel
     DataChannel channel = null;
     CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
-    childBlock.getPlan().build();
-    currentBlock.getPlan().build();
-    int srcPID = childBlock.getPlan().getTopNodePid(0);
-    int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
     if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
-      channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, childNode.getOutSchema());
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
       Column[] columns = new Column[partitionDesc.getColumns().size()];
       channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-//      channel.setSchema(childNode.getOutSchema());
+      channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
     } else if (partitionsType == CatalogProtos.PartitionsType.HASH) {
-      channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION,
-          partitionDesc.getNumPartitions(), childNode.getOutSchema());
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION,
+          partitionDesc.getNumPartitions());
       Column[] columns = new Column[partitionDesc.getColumns().size()];
       channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
-//      channel.setSchema(childNode.getOutSchema());
+      channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
     } else if(partitionsType == CatalogProtos.PartitionsType.RANGE) {
       // TODO
@@ -438,6 +385,16 @@ public class GlobalPlanner {
       // TODO
     }
 
+    // 3. create a ScanNode for scanning shuffle data
+    //    StoreTableNode as the root node of the new execution block
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    currentNode.setChild(scanNode);
+    currentNode.setInSchema(scanNode.getOutSchema());
+    currentBlock.setPlan(currentNode);
+
+    DataChannel.linkChannelAndPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
+        currentBlock.getPlan().getFirstPlanGroup(), channel);
+
     masterPlan.addConnect(channel);
 
     return currentBlock;
@@ -458,10 +415,9 @@ public class GlobalPlanner {
       LogicalNode child = super.visitProjection(context, plan, node, stack);
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-      LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
-
-      node.setChild(nodeOfExecBlock);
-      node.setInSchema(nodeOfExecBlock.getOutSchema());
+      execBlock.getPlan().build();
+      node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+      node.setInSchema(execBlock.getPlan().getOutSchema(0));
       execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);
       return node;
@@ -473,42 +429,36 @@ public class GlobalPlanner {
       LogicalNode child = super.visitLimit(context, plan, node, stack);
 
       ExecutionBlock block;
-      LogicalNode topNodeOfBlock;
       block = context.execBlockMap.remove(child.getPID());
       if (child.getType() == NodeType.SORT) {
-        topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
-        node.setChild(topNodeOfBlock);
+        node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
         block.setPlan(node);
 
         ExecutionBlock childBlock = context.plan.getChild(block, 0);
         LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
-        topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
-        childLimit.setChild(topNodeOfBlock);
+        childLimit.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
         childBlock.setPlan(childLimit);
 
         DataChannel channel = context.plan.getChannel(childBlock, block);
         channel.setPartitionNum(1);
         context.execBlockMap.put(node.getPID(), block);
       } else {
-        topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
-        node.setChild(topNodeOfBlock);
+        node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
         block.setPlan(node);
 
         ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
-        ScanNode scanNode = buildInputExecutor(plan, node.getOutSchema(), block.getId(), newExecBlock.getId(), storeType);
+        DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
+        newChannel.setPartitionKey(new Column[]{});
+        newChannel.setSchema(node.getOutSchema());
+        newChannel.setStoreType(storeType);
+
+        ScanNode scanNode = buildInputExecutor(plan, newChannel);
         LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
         parentLimit.setChild(scanNode);
         newExecBlock.setPlan(parentLimit);
 
-        block.getPlan().build();
-        newExecBlock.getPlan().build();
-        int srcPID = block.getPlan().getTopNodePid(0);
-        int targetPID = newExecBlock.getInputContext().getScanNodes()[0].getPID();
-        DataChannel newChannel = new DataChannel(block, newExecBlock, srcPID, targetPID, HASH_PARTITION, 1, node.getOutSchema());
-        newChannel.setPartitionKey(new Column[]{});
-//        newChannel.setSchema(node.getOutSchema());
-        newChannel.setStoreType(storeType);
-
+        DataChannel.linkChannelAndPlanGroups(block.getPlan().getFirstPlanGroup(),
+            newExecBlock.getPlan().getFirstPlanGroup(), newChannel);
         context.plan.addConnect(newChannel);
         context.execBlockMap.put(parentLimit.getPID(), newExecBlock);
         node = parentLimit;
@@ -548,9 +498,9 @@ public class GlobalPlanner {
       LogicalNode child = super.visitFilter(context, plan, node, stack);
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-      LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
-      node.setChild(nodeOfExecBlock);
-      node.setInSchema(nodeOfExecBlock.getOutSchema());
+      execBlock.getPlan().build();
+      node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
+      node.setInSchema(execBlock.getPlan().getOutSchema(0));
       execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);
 
@@ -597,30 +547,27 @@ public class GlobalPlanner {
       }
 
       ExecutionBlock execBlock;
-      int targetPID;
+      PlanGroup parentPlanGroup = null;
       if (unionBlocks.size() == 0) {
         execBlock = context.plan.newExecutionBlock();
-        targetPID = -1;
       } else {
         execBlock = unionBlocks.get(0);
-        targetPID = execBlock.getPlan().getTopNodePid(0);
+        parentPlanGroup = execBlock.getPlan().getFirstPlanGroup();
       }
 
       for (ExecutionBlock childBlocks : unionBlocks) {
-        ExecutionPlan executionPlan = childBlocks.getPlan();
-        executionPlan.build();
-        LogicalNode unionCandidate = executionPlan.getTopNode(0);
-        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.LEFT).getPID()));
-        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.RIGHT).getPID()));
+        UnionNode union = (UnionNode) childBlocks.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
+        queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
+        queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
       }
 
-      execBlock.getPlan().build();
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
-        childBlocks.getPlan().build();
-        LogicalNode topNode = childBlocks.getPlan().getTopNode(0);
-        int srcPID = topNode.getPID();
-        DataChannel channel = new DataChannel(childBlocks, execBlock, srcPID, targetPID, NONE_PARTITION, 1, topNode.getOutSchema());
+        DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
         channel.setStoreType(storeType);
+        if (childBlocks.getPlan().hasPlanGroup()) {
+          DataChannel.linkChannelAndPlanGroups(childBlocks.getPlan().getFirstPlanGroup(),
+              parentPlanGroup, channel);
+        }
         context.plan.addConnect(channel);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6d03642..d648dec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -26,7 +26,6 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 
 import java.util.ArrayList;
@@ -124,7 +123,7 @@ public class MasterPlan {
 
   @VisibleForTesting
   public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
-    addConnect(new DataChannel(src, target, null, null, type));
+    addConnect(new DataChannel(src, target, type));
   }
 
   public boolean isConnected(ExecutionBlock src, ExecutionBlock target) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
index 8d0eecf..6cecf6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner.graph;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.gson.annotations.Expose;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.util.TUtil;
 
@@ -34,9 +33,9 @@ import java.util.*;
  */
 public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
   /** map: child -> parent */
-  @Expose protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
+  protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
   /** map: parent -> child */
-  @Expose protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
+  protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
 
   @Override
   public int getVertexSize() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 48c7f71..147c6cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -514,6 +514,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         } else {
           ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
           DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
+          execBlock.getPlan().build();
+          parent.getPlan().build();
           setRepartitionIfNecessary(subQuery, channel);
           createTasks(subQuery);
 
@@ -656,7 +658,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     private static void createTasks(SubQuery subQuery) throws IOException {
       MasterPlan masterPlan = subQuery.getMasterPlan();
       ExecutionBlock execBlock = subQuery.getBlock();
-//      execBlock.getPlan().build();
       QueryUnit [] tasks;
       if (execBlock.getInputContext() == null) {
         return;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc50ff78/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 48211b4..790a14d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -472,7 +472,7 @@ public class TestPhysicalPlanner {
     Column key1 = new Column("score.deptName", Type.TEXT);
     Column key2 = new Column("score.class", Type.TEXT);
     DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+        PartitionType.HASH_PARTITION, numPartitions);
     dataChannel.setSchema(rootNode.getOutSchema());
     dataChannel.setPartitionKey(new Column[]{key1, key2});
     List<DataChannel> channels = new ArrayList<DataChannel>();
@@ -534,7 +534,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode = optimizer.optimize(plan);
     int numPartitions = 1;
     DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+        PartitionType.HASH_PARTITION, numPartitions);
     dataChannel.setSchema(rootNode.getOutSchema());
     dataChannel.setPartitionKey(new Column[]{});
     List<DataChannel> channels = new ArrayList<DataChannel>();
@@ -852,7 +852,7 @@ public class TestPhysicalPlanner {
 
     SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
     DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
-        ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.RANGE_PARTITION);
+        PartitionType.RANGE_PARTITION);
     channel.setSchema(rootNode.getOutSchema());
     channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
     List<DataChannel> channels = new ArrayList<DataChannel>();