You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/26 10:14:36 UTC

[4/4] git commit: TAJO-266: Extend ExecutionBlock and Task to support multiple outputs. (jihoon)

TAJO-266: Extend ExecutionBlock and Task to support multiple outputs. (jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/7c97735e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/7c97735e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/7c97735e

Branch: refs/heads/DAG-execplan
Commit: 7c97735e1008e61f00ddbd41ba924a00d5b8baf3
Parents: 67e0d94
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Nov 26 18:13:11 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Nov 26 18:13:11 2013 +0900

----------------------------------------------------------------------
 .../java/org/apache/tajo/catalog/Schema.java    |   5 +
 .../java/org/apache/tajo/catalog/TableDesc.java |   5 +
 .../apache/tajo/engine/json/CoreGsonHelper.java |   4 +
 .../tajo/engine/json/ExecutionPlanAdapter.java  |  49 ++
 .../tajo/engine/json/SourceContextAdapter.java  |  47 ++
 .../apache/tajo/engine/planner/LogicalPlan.java |   5 +-
 .../tajo/engine/planner/PhysicalPlanner.java    |   6 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 463 +++++++++++--------
 .../apache/tajo/engine/planner/PlannerUtil.java | 227 +++++++++
 .../tajo/engine/planner/global/DataChannel.java |  50 +-
 .../planner/global/DestinationContext.java      | 104 +++++
 .../engine/planner/global/ExecutionBlock.java   |  73 +--
 .../engine/planner/global/ExecutionPlan.java    | 381 +++++++++++++++
 .../planner/global/ExecutionPlanEdge.java       |  51 ++
 .../engine/planner/global/GlobalPlanner.java    | 175 ++++---
 .../engine/planner/global/InputContext.java     |  78 ++++
 .../tajo/engine/planner/global/MasterPlan.java  |  21 +-
 .../engine/planner/graph/DirectedGraph.java     |   6 +-
 .../planner/graph/SimpleDirectedGraph.java      |  35 +-
 .../planner/graph/SimpleUndirectedGraph.java    |  12 +-
 .../tajo/engine/planner/logical/ExceptNode.java |  19 +-
 .../engine/planner/logical/GroupbyNode.java     |  12 +-
 .../engine/planner/logical/IntersectNode.java   |  14 +-
 .../tajo/engine/planner/logical/JoinNode.java   |   8 +-
 .../tajo/engine/planner/logical/LimitNode.java  |   4 +-
 .../engine/planner/logical/LogicalNode.java     |  14 +-
 .../engine/planner/logical/LogicalRootNode.java |   6 +-
 .../tajo/engine/planner/logical/NodeType.java   |   1 -
 .../engine/planner/logical/ProjectionNode.java  |   8 +-
 .../tajo/engine/planner/logical/ScanNode.java   |   4 +-
 .../engine/planner/logical/SelectionNode.java   |   8 +-
 .../tajo/engine/planner/logical/SortNode.java   |   6 +-
 .../engine/planner/logical/StoreTableNode.java  |   6 +-
 .../tajo/engine/planner/logical/UnionNode.java  |  19 +-
 .../planner/physical/IndexedStoreExec.java      |  12 +-
 .../planner/physical/MultiOutputExec.java       |  52 +++
 .../planner/physical/PartitionedStoreExec.java  |  27 +-
 .../planner/physical/PhysicalRootExec.java      |  81 ++++
 .../engine/planner/physical/StoreTableExec.java |   6 +-
 .../tajo/engine/query/QueryUnitRequest.java     |   3 +-
 .../tajo/engine/query/QueryUnitRequestImpl.java |  82 +++-
 .../tajo/master/DefaultTaskScheduler.java       |  11 +-
 .../master/querymaster/QueryMasterTask.java     |   2 +-
 .../tajo/master/querymaster/QueryUnit.java      |  60 +--
 .../tajo/master/querymaster/Repartitioner.java  |  52 ++-
 .../tajo/master/querymaster/SubQuery.java       |  30 +-
 .../org/apache/tajo/worker/TajoQueryEngine.java |   7 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  57 ++-
 .../apache/tajo/worker/TaskAttemptContext.java  |  19 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   8 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   4 +-
 .../planner/global/TestExecutionPlan.java       |  98 ++++
 .../engine/planner/global/TestMasterPlan.java   |   6 +-
 .../planner/physical/TestBNLJoinExec.java       |  24 +-
 .../planner/physical/TestBSTIndexExec.java      |  12 +-
 .../planner/physical/TestExternalSortExec.java  |  14 +-
 .../physical/TestFullOuterHashJoinExec.java     |  37 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  56 ++-
 .../planner/physical/TestHashAntiJoinExec.java  |  12 +-
 .../planner/physical/TestHashJoinExec.java      |  10 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   8 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  77 +--
 .../physical/TestLeftOuterNLJoinExec.java       |  72 +--
 .../planner/physical/TestMergeJoinExec.java     |  14 +-
 .../engine/planner/physical/TestNLJoinExec.java |  20 +-
 .../planner/physical/TestPhysicalPlanner.java   | 222 ++++++---
 .../physical/TestRightOuterHashJoinExec.java    |  51 +-
 .../physical/TestRightOuterMergeJoinExec.java   | 103 +++--
 .../engine/planner/physical/TestSortExec.java   |  12 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |  24 +-
 70 files changed, 2512 insertions(+), 799 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..404af75 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,6 +212,11 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
     }
   }
 
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
 	@Override
 	public boolean equals(Object o) {
 		if (o instanceof Schema) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..5573de6 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -158,4 +158,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
     }
     return builder.build();
   }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
index 4dfb314..194d2b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
@@ -30,6 +30,8 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.InputContext;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.json.*;
 import org.apache.tajo.util.TUtil;
@@ -56,6 +58,8 @@ public class CoreGsonHelper {
     adapters.put(AggFunction.class, new FunctionAdapter());
     adapters.put(Datum.class, new DatumAdapter());
     adapters.put(DataType.class, new DataTypeAdapter());
+    adapters.put(ExecutionPlan.class, new ExecutionPlanAdapter());
+    adapters.put(InputContext.class, new SourceContextAdapter());
 
     return adapters;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
new file mode 100644
index 0000000..7ee8347
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.ExecutionPlanJsonHelper;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class ExecutionPlanAdapter implements GsonSerDerAdapter<ExecutionPlan> {
+
+  @Override
+  public ExecutionPlan deserialize(JsonElement jsonElement, Type type,
+                                   JsonDeserializationContext context) throws JsonParseException {
+    JsonObject json = jsonElement.getAsJsonObject();
+    String typeName = json.get("type").getAsJsonPrimitive().getAsString();
+    Preconditions.checkState(typeName.equals("ExecutionPlan"));
+    ExecutionPlanJsonHelper helper = context.deserialize(json.get("body"), ExecutionPlanJsonHelper.class);
+    return helper.toExecutionPlan();
+  }
+
+  @Override
+  public JsonElement serialize(ExecutionPlan src, Type type, JsonSerializationContext context) {
+    ExecutionPlanJsonHelper helper = new ExecutionPlanJsonHelper(src);
+    JsonObject json = new JsonObject();
+    json.addProperty("type", "ExecutionPlan");
+    json.add("body", context.serialize(helper, ExecutionPlanJsonHelper.class));
+    return json;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
new file mode 100644
index 0000000..d92d504
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.InputContext;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class SourceContextAdapter implements GsonSerDerAdapter<InputContext> {
+
+  @Override
+  public InputContext deserialize(JsonElement jsonElement, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+    JsonObject json = jsonElement.getAsJsonObject();
+    ScanNode[] scanNodes = context.deserialize(json.get("body"), ScanNode[].class);
+    InputContext srcContext = new InputContext();
+    for (ScanNode scan : scanNodes) {
+      srcContext.addScanNode(scan);
+    }
+    return srcContext;
+  }
+
+  @Override
+  public JsonElement serialize(InputContext src, Type typeOfSrc, JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.add("body", context.serialize(src.getScanNodes(), ScanNode[].class));
+    return json;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 4a305ae..3295efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.*;
@@ -122,11 +123,13 @@ public class LogicalPlan {
   }
 
   public void connectBlocks(QueryBlock srcBlock, QueryBlock targetBlock, BlockType type) {
+    Preconditions.checkState(queryBlockGraph.getParentCount(srcBlock.getName()) <= 0,
+        "There should be only one parent block.");
     queryBlockGraph.addEdge(srcBlock.getName(), targetBlock.getName(), new BlockEdge(srcBlock, targetBlock, type));
   }
 
   public QueryBlock getParentBlock(QueryBlock block) {
-    return queryBlocks.get(queryBlockGraph.getParent(block.getName()));
+    return queryBlocks.get(queryBlockGraph.getParent(block.getName(), 0));
   }
 
   public List<QueryBlock> getChildBlocks(QueryBlock block) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
index ebe47b4..90cfc9c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -21,16 +21,16 @@
  */
 package org.apache.tajo.engine.planner;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 /**
  * This class generates a physical execution plan.
  */
 public interface PhysicalPlanner {
   public PhysicalExec createPlan(TaskAttemptContext context,
-                                 LogicalNode logicalPlan)
+                                 ExecutionPlan execPlan)
       throws InternalException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index db58e32..87daa8f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,44 +16,45 @@
  * limitations under the License.
  */
 
-/**
- *
- */
 package org.apache.tajo.engine.planner;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
   private static final int UNGENERATED_PID = -1;
@@ -66,54 +67,74 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     this.sm = sm;
   }
 
-  public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
+  public PhysicalExec createPlan(final TaskAttemptContext context, ExecutionPlan plan)
       throws InternalException {
 
     PhysicalExec execPlan;
 
     try {
-      execPlan = createPlanRecursive(context, logicalPlan);
-      if (execPlan instanceof StoreTableExec
-          || execPlan instanceof IndexedStoreExec
-          || execPlan instanceof PartitionedStoreExec) {
-        return execPlan;
-      } else if (context.getDataChannel() != null) {
-        return buildOutputOperator(context, logicalPlan, execPlan);
-      } else {
-        return execPlan;
-      }
+      plan = checkOutputOperator(context, plan);
+      execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+      return execPlan;
     } catch (IOException ioe) {
       throw new InternalException(ioe);
     }
   }
 
-  private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
-                                           PhysicalExec execPlan) throws IOException {
-    DataChannel channel = context.getDataChannel();
-    StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
-    storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
-    storeTableNode.setInSchema(plan.getOutSchema());
-    storeTableNode.setOutSchema(plan.getOutSchema());
-    if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
-      storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
-    } else {
-      storeTableNode.setDefaultParition();
+  @VisibleForTesting
+  public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
+      throws InternalException {
+    PhysicalExec execPlan;
+
+    try {
+      execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+      return execPlan;
+    } catch (IOException ioe) {
+      throw new InternalException(ioe);
     }
-    storeTableNode.setChild(plan);
+  }
 
-    PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
-    return outExecPlan;
+  private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
+    LogicalNode root = plan.getTerminalNode();
+    List<DataChannel> channels = context.getOutgoingChannels();
+    for (DataChannel channel : channels) {
+      LogicalNode node = plan.getRootChild(channel.getSrcPID());
+      if (node.getType() != NodeType.STORE) {
+        StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
+        storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+        storeTableNode.setInSchema(channel.getSchema());
+        storeTableNode.setOutSchema(channel.getSchema());
+        if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+          storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
+        } else {
+          storeTableNode.setDefaultParition();
+        }
+
+        plan.remove(node, root);
+        plan.add(node, storeTableNode, Tag.SINGLE);
+        plan.add(storeTableNode, root, Tag.SINGLE);
+        channel.updateSrcPID(storeTableNode.getPID());
+      }
+    }
+    return plan;
   }
 
-  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
     PhysicalExec leftExec;
     PhysicalExec rightExec;
+    PhysicalExec currentExec;
 
     switch (logicalNode.getType()) {
 
       case ROOT:
         LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
-        return createPlanRecursive(ctx, rootNode.getChild());
+        List<PhysicalExec> childExecs = new ArrayList<PhysicalExec>();
+        for (LogicalNode child : plan.getChilds(rootNode)) {
+          childExecs.add(createPlanRecursive(ctx, plan, child));
+        }
+        return new PhysicalRootExec(ctx, childExecs);
 
       case EXPRS:
         EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,60 +142,109 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
       case STORE:
         StoreTableNode storeNode = (StoreTableNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, storeNode.getChild());
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(storeNode).get(0));
         return createStorePlan(ctx, storeNode, leftExec);
 
       case SELECTION:
         SelectionNode selNode = (SelectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, selNode.getChild());
-        return new SelectionExec(ctx, selNode, leftExec);
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(selNode).get(0));
+        currentExec = new SelectionExec(ctx, selNode, leftExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case PROJECTION:
         ProjectionNode prjNode = (ProjectionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, prjNode.getChild());
-        return new ProjectionExec(ctx, prjNode, leftExec);
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(prjNode).get(0));
+        currentExec = new ProjectionExec(ctx, prjNode, leftExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case TABLE_SUBQUERY: {
         TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
-        return leftExec;
+        leftExec = createPlanRecursive(ctx, plan, subQueryNode.getSubQuery());
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+        } else {
+          return leftExec;
+        }
 
       } case SCAN:
         leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
-        return leftExec;
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+        } else {
+          return leftExec;
+        }
 
       case GROUP_BY:
         GroupbyNode grpNode = (GroupbyNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, grpNode.getChild());
-        return createGroupByPlan(ctx, grpNode, leftExec);
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(grpNode).get(0));
+        currentExec = createGroupByPlan(ctx, plan, grpNode, leftExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case SORT:
         SortNode sortNode = (SortNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, sortNode.getChild());
-        return createSortPlan(ctx, sortNode, leftExec);
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(sortNode).get(0));
+        currentExec = createSortPlan(ctx, plan, sortNode, leftExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case JOIN:
         JoinNode joinNode = (JoinNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
-        return createJoinPlan(ctx, joinNode, leftExec, rightExec);
+        List<LogicalNode> childs = plan.getChilds(joinNode);
+        leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+        rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+        currentExec = createJoinPlan(ctx, plan, joinNode, leftExec, rightExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case UNION:
         UnionNode unionNode = (UnionNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
-        rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
-        return new UnionExec(ctx, leftExec, rightExec);
+        childs = plan.getChilds(unionNode);
+        leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+        rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+        currentExec = new UnionExec(ctx, leftExec, rightExec);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case LIMIT:
         LimitNode limitNode = (LimitNode) logicalNode;
-        leftExec = createPlanRecursive(ctx, limitNode.getChild());
-        return new LimitExec(ctx, limitNode.getInSchema(),
+        leftExec = createPlanRecursive(ctx, plan, plan.getChilds(limitNode).get(0));
+        currentExec = new LimitExec(ctx, limitNode.getInSchema(),
             limitNode.getOutSchema(), leftExec, limitNode);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       case BST_INDEX_SCAN:
         IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
-        leftExec = createIndexScanExec(ctx, indexScanNode);
-        return leftExec;
+        currentExec = createIndexScanExec(ctx, indexScanNode);
+        if (plan.getParentCount(logicalNode) > 1) {
+          return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+        } else {
+          return currentExec;
+        }
 
       default:
         return null;
@@ -194,107 +264,108 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return size;
   }
 
-  public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
-                                     PhysicalExec rightExec) throws IOException {
+  public PhysicalExec createJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode joinNode,
+                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
 
     switch (joinNode.getJoinType()) {
       case CROSS:
-        return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
+        return createCrossJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case INNER:
-        return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
+        return createInnerJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case LEFT_OUTER:
-        return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+        return createLeftOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case RIGHT_OUTER:
-        return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+        return createRightOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case FULL_OUTER:
-        return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
+        return createFullOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case LEFT_SEMI:
-        return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+        return createLeftSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case RIGHT_SEMI:
-        return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
+        return createRightSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case LEFT_ANTI:
-        return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+        return createLeftAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       case RIGHT_ANTI:
-        return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
+        return createRightAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
 
       default:
         throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
     }
   }
 
-  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
 
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
 
       switch (algorithm) {
         case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + join.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, join, leftExec, rightExec);
         case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + join.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, join, leftExec, rightExec);
         default:
           // fallback algorithm
           LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          return new BNLJoinExec(context, join, leftExec, rightExec);
       }
 
     } else {
-      return new BNLJoinExec(context, plan, leftExec, rightExec);
+      return new BNLJoinExec(context, join, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
                                            PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, node);
 
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
 
       switch (algorithm) {
         case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + node.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, node, leftExec, rightExec);
         case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + node.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, node, leftExec, rightExec);
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
-          return new HashJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + node.getPID() +") chooses [In-memory Hash Join]");
+          return new HashJoinExec(context, node, leftExec, rightExec);
         case MERGE_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
-          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+          LOG.info("Join (" + node.getPID() +") chooses [Sort Merge Join]");
+          return createMergeInnerJoin(context, node, leftExec, rightExec);
         case HYBRID_HASH_JOIN:
 
         default:
           LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
-          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+          return createMergeInnerJoin(context, node, leftExec, rightExec);
       }
 
 
     } else {
-      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+      return createBestInnerJoinPlan(context, plan, node, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    List<LogicalNode> childs = plan.getChilds(node);
+    String [] leftLineage = PlannerUtil.getLineage(plan, childs.get(0));
+    String [] rightLineage = PlannerUtil.getLineage(plan, childs.get(1));
     long leftSize = estimateSizeRecursive(context, leftLineage);
     long rightSize = estimateSizeRecursive(context, rightLineage);
 
@@ -318,10 +389,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         selectedOuter = leftExec;
       }
 
-      LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
-      return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+      LOG.info("Join (" + node.getPID() +") chooses [InMemory Hash Join]");
+      return new HashJoinExec(context, node, selectedOuter, selectedInner);
     } else {
-      return createMergeInnerJoin(context, plan, leftExec, rightExec);
+      return createMergeInnerJoin(context, node, leftExec, rightExec);
     }
   }
 
@@ -340,58 +411,58 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
   }
 
-  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+          return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
         case NESTED_LOOP_JOIN:
           //the right operand is too large, so we opt for NL implementation of left outer join
-          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-          return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+          return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
         default:
           LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
       }
     } else {
-      return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+      return createBestLeftOuterJoinPlan(context, plan, join, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                    PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
     long rightTableVolume = estimateSizeRecursive(context, rightLineage);
 
     if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
       // we can implement left outer join using hash join, using the right operand as the build relation
-      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+      return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
     }
     else {
       //the right operand is too large, so we opt for NL implementation of left outer join
-      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
-      return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+      return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
     // blocking, but merge join is blocking as well)
-    String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] outerLineage4 = PlannerUtil.getLineage(plan.getChild(join, 0));
     long outerSize = estimateSizeRecursive(context, outerLineage4);
     if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
-      LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-      return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+      LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+      return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
     } else {
-      return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
     }
   }
 
@@ -408,56 +479,56 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
   }
 
-  private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                 PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
-          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+          LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+          return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
         case MERGE_JOIN:
-          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+          return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
         default:
           LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
-          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+          return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
       }
     } else {
-      return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+      return createBestRightJoinPlan(context, plan, join, leftExec, rightExec);
     }
   }
 
-  private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+          return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
 
         case MERGE_JOIN:
-          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+          return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
 
         default:
           LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
-          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+          return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
       }
     } else {
-      return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+      return createBestFullOuterJoinPlan(context, plan, join, leftExec, rightExec);
     }
   }
 
-  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                             PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+    String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
     long outerSize2 = estimateSizeRecursive(context, leftLineage);
     long innerSize2 = estimateSizeRecursive(context, rightLineage);
 
@@ -472,8 +543,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       selectedLeft = rightExec;
       selectedRight = leftExec;
     }
-    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
-    return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
+    LOG.info("Full Outer Join (" + join.getPID() +") chooses [Hash Join]");
+    return new HashFullOuterJoinExec(context, join, selectedRight, selectedLeft);
   }
 
   private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -491,117 +562,117 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
   }
 
-  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                    PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+    String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
     long outerSize2 = estimateSizeRecursive(context, leftLineage);
     long innerSize2 = estimateSizeRecursive(context, rightLineage);
     final long threshold = 1048576 * 128;
     if (outerSize2 < threshold || innerSize2 < threshold) {
-      return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+      return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
     } else {
-      return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
     }
   }
 
   /**
    *  Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
    */
-  private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-          return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
 
         default:
           LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
       }
     } else {
-      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-      return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
     }
   }
 
   /**
    *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
    */
-  private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+          LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
 
         default:
           LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+          return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
       }
     } else {
-      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+      LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
     }
   }
 
   /**
    *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
    */
-  private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+          LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
 
         default:
           LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+          return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
       }
     } else {
-      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-      return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
     }
   }
 
   /**
    *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
    */
-  private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+  private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     Enforcer enforcer = context.getEnforcer();
-    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
     if (property != null) {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
       switch (algorithm) {
         case IN_MEMORY_HASH_JOIN:
-          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+          LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
 
         default:
           LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
           LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
-          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+          return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
       }
     } else {
-      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
-      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+      LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
     }
   }
 
@@ -609,7 +680,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                       StoreTableNode plan, PhysicalExec subOp) throws IOException {
     if (plan.getPartitionType() == PartitionType.HASH_PARTITION
         || plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
-      switch (ctx.getDataChannel().getPartitionType()) {
+      DataChannel channel = null;
+      for (DataChannel outChannel : ctx.getOutgoingChannels()) {
+        if (outChannel.getSrcPID() == plan.getPID()) {
+          channel = outChannel;
+        }
+      }
+      switch (channel.getPartitionType()) {
         case HASH_PARTITION:
           return new PartitionedStoreExec(ctx, sm, plan, subOp);
 
@@ -620,10 +697,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           if (sortExec != null) {
             sortSpecs = sortExec.getSortSpecs();
           } else {
-            Column[] columns = ctx.getDataChannel().getPartitionKey();
-            SortSpec specs[] = new SortSpec[columns.length];
+            Column[] columns = channel.getPartitionKey();
+            sortSpecs= new SortSpec[columns.length];
             for (int i = 0; i < columns.length; i++) {
-              specs[i] = new SortSpec(columns[i]);
+              sortSpecs[i] = new SortSpec(columns[i]);
             }
           }
 
@@ -642,11 +719,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
         "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
 
-    FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+    FragmentProto[] fragments = ctx.getTables(scanNode.getCanonicalName());
     return new SeqScanExec(ctx, sm, scanNode, fragments);
   }
 
-  public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+  public PhysicalExec createGroupByPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
+                                        PhysicalExec subOp)
       throws IOException {
 
     Enforcer enforcer = context.getEnforcer();
@@ -659,7 +737,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         return createSortAggregation(context, property, groupbyNode, subOp);
       }
     }
-    return createBestAggregationPlan(context, groupbyNode, subOp);
+    return createBestAggregationPlan(context, plan, groupbyNode, subOp);
   }
 
   private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -678,7 +756,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
 
     if (property != null) {
-      List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+      List<SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
       SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
       int i = 0;
 
@@ -692,20 +770,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     SortNode sortNode = new SortNode(-1, sortSpecs);
     sortNode.setInSchema(subOp.getSchema());
     sortNode.setOutSchema(subOp.getSchema());
-    // SortExec sortExec = new SortExec(sortNode, child);
     ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
     LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
     return new SortAggregateExec(ctx, groupbyNode, sortExec);
   }
 
-  private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+  private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
                                                  PhysicalExec subOp) throws IOException {
     Column[] grpColumns = groupbyNode.getGroupingColumns();
     if (grpColumns.length == 0) {
       return createInMemoryHashAggregation(context, groupbyNode, subOp);
     }
 
-    String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+    String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(groupbyNode).get(0));
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
     final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
 
@@ -719,7 +796,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+  public PhysicalExec createSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
     Enforcer enforcer = context.getEnforcer();
     EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
@@ -732,16 +809,16 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       }
     }
 
-    return createBestSortPlan(context, sortNode, child);
+    return createBestSortPlan(context, plan, sortNode, child);
   }
 
-  public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+  public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
-    String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+    String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(sortNode).get(0));
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
     final long threshold = 1048576 * 2000;
 
-    // if the relation size is less than the reshold,
+    // if the relation size is less than thereshold,
     // the in-memory sort will be used.
     if (estimatedSize <= threshold) {
       return new MemSortExec(context, sortNode, child);
@@ -757,17 +834,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
         "Error: There is no table matched to %s", annotation.getCanonicalName());
 
-    FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+    FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
     List<FileFragment> fragments =
         FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
 
-    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
+    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
+        annotation.getSortKeys());
     Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
 
     TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());
-    return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
-        annotation.getKeySchema(), comp, annotation.getDatum());
+    return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(
+        indexPath, indexName), annotation.getKeySchema(), comp,
+        annotation.getDatum());
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 9371463..3b8bd08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,6 +29,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.storage.TupleComparator;
@@ -61,6 +62,17 @@ public class PlannerUtil {
     }
     return tableNames;
   }
+
+  public static String [] getLineage(ExecutionPlan plan, LogicalNode node) {
+    LogicalNode [] scans =  PlannerUtil.findAllNodes(plan, node, NodeType.SCAN);
+    String [] tableNames = new String[scans.length];
+    ScanNode scan;
+    for (int i = 0; i < scans.length; i++) {
+      scan = (ScanNode) scans[i];
+      tableNames[i] = scan.getCanonicalName();
+    }
+    return tableNames;
+  }
   
   /**
    * Delete the logical node from a plan.
@@ -151,6 +163,97 @@ public class PlannerUtil {
     parentNode.setChild(newNode);
   }
 
+  public static GroupbyNode[] transformGroupbyTo2Pv2(LogicalPlan plan, GroupbyNode groupBy) {
+    Preconditions.checkNotNull(groupBy);
+
+    GroupbyNode parent = null, child = null;
+
+    // cloning groupby node
+    try {
+      parent = groupBy;
+      child = (GroupbyNode) groupBy.clone();
+      child.setPid(plan.newPID());
+    } catch (CloneNotSupportedException e) {
+      e.printStackTrace();
+    }
+
+    List<Target> firstStepTargets = Lists.newArrayList();
+    Target[] secondTargets = parent.getTargets();
+    Target[] firstTargets = child.getTargets();
+
+    Target second;
+    Target first;
+    int targetId =  0;
+    for (int i = 0; i < firstTargets.length; i++) {
+      second = secondTargets[i];
+      first = firstTargets[i];
+
+      List<AggregationFunctionCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
+      List<AggregationFunctionCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+
+      if (firstStepFunctions.size() == 0) {
+        firstStepTargets.add(first);
+        targetId++;
+      } else {
+        for (AggregationFunctionCallEval func : firstStepFunctions) {
+          Target newTarget;
+
+          if (func.isDistinct()) {
+            List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
+            newTarget = new Target(new FieldEval(fields.get(0)));
+            String targetName = "column_" + (targetId++);
+            newTarget.setAlias(targetName);
+
+            AggregationFunctionCallEval secondFunc = null;
+            for (AggregationFunctionCallEval sf : secondStepFunctions) {
+              if (func.equals(sf)) {
+                secondFunc = sf;
+                break;
+              }
+            }
+
+            secondFunc.setArgs(new EvalNode [] {new FieldEval(
+                new Column(targetName, newTarget.getEvalTree().getValueType()))});
+          } else {
+            func.setFirstPhase();
+            newTarget = new Target(func);
+            String targetName = "column_" + (targetId++);
+            newTarget.setAlias(targetName);
+
+            AggregationFunctionCallEval secondFunc = null;
+            for (AggregationFunctionCallEval sf : secondStepFunctions) {
+              if (func.equals(sf)) {
+                secondFunc = sf;
+                break;
+              }
+            }
+            secondFunc.setArgs(new EvalNode [] {new FieldEval(
+                new Column(targetName, newTarget.getEvalTree().getValueType()))});
+          }
+          firstStepTargets.add(newTarget);
+        }
+      }
+
+      // Getting new target list and updating input/output schema from the new target list.
+      Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
+      Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
+      List<Target> newTarget = Lists.newArrayList();
+      for (Column column : parent.getGroupingColumns()) {
+        if (!targetSchema.contains(column.getQualifiedName())) {
+          newTarget.add(new Target(new FieldEval(column)));
+        }
+      }
+      targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
+
+      child.setTargets(targetArray);
+      child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
+      // set the groupby chaining
+      groupBy.setInSchema(child.getOutSchema());
+
+    }
+    return new GroupbyNode[] {parent, child};
+  }
+
   public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
     Preconditions.checkNotNull(groupBy);
 
@@ -241,6 +344,21 @@ public class PlannerUtil {
     return child;
   }
 
+  public static SortNode[] transformSortTo2p(LogicalPlan plan, SortNode sort) {
+    Preconditions.checkArgument(sort != null);
+    SortNode parent = null, child = null;
+    try {
+      parent = sort;
+      child = (SortNode) sort.clone();
+      child.setPid(plan.newPID());
+    } catch (CloneNotSupportedException e) {
+      LOG.warn(e);
+    }
+
+    parent.setInSchema(child.getOutSchema());
+
+    return new SortNode[]{parent, child};
+  }
   
   /**
    * Find the top logical node matched to type from the given node
@@ -262,6 +380,88 @@ public class PlannerUtil {
     return (T) finder.getFoundNodes().get(0);
   }
 
+  private static class LogicalNodeFinderForExecPlan {
+    private NodeType type;
+    private ExecutionPlan plan;
+    private LogicalNode node;
+    private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+    public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan, LogicalNode node) {
+      this.type = type;
+      this.plan = plan;
+      this.node = node;
+    }
+
+    public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+      this(type, plan, plan.getTerminalNode());
+    }
+
+    public void find() {
+      this.visit(node);
+    }
+
+    private void visit(LogicalNode node) {
+      if (plan.getChildCount(node) > 0) {
+        for (LogicalNode child : plan.getChilds(node)) {
+          this.visit(child);
+        }
+      }
+
+      if (node.getType() == type) {
+        foundNodes.add(node);
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return foundNodes;
+    }
+  }
+
+  private static class ParentNodeFinderForExecPlan {
+    private NodeType type;
+    private ExecutionPlan plan;
+    private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+    public ParentNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+      this.type = type;
+      this.plan = plan;
+    }
+
+    public void find() {
+      this.visit(plan.getTerminalNode());
+    }
+
+    private void visit(LogicalNode node) {
+      if (plan.getChildCount(node) > 0) {
+        for (LogicalNode child : plan.getChilds(node)) {
+          this.visit(child);
+        }
+        for (LogicalNode child : plan.getChilds(node)) {
+          if (child.getType() == type) {
+            foundNodes.add(child);
+          }
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return foundNodes;
+    }
+  }
+
+  public static <T extends LogicalNode> T findTopNode(ExecutionPlan executionPlan, NodeType type) {
+    Preconditions.checkNotNull(executionPlan);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
+    finder.find();
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
   /**
    * Find the all logical node matched to type from the given node
    *
@@ -282,6 +482,20 @@ public class PlannerUtil {
     List<LogicalNode> founds = finder.getFoundNodes();
     return founds.toArray(new LogicalNode[founds.size()]);
   }
+
+  public static LogicalNode [] findAllNodes(ExecutionPlan plan, LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, plan, node);
+    finder.find();
+
+    if (finder.getFoundNodes().size() == 0) {
+      return new LogicalNode[] {};
+    }
+    List<LogicalNode> founds = finder.getFoundNodes();
+    return founds.toArray(new LogicalNode[founds.size()]);
+  }
   
   /**
    * Find a parent node of a given-typed operator.
@@ -303,6 +517,19 @@ public class PlannerUtil {
     return (T) finder.getFoundNodes().get(0);
   }
 
+  public static <T extends LogicalNode> T findTopParentNode(ExecutionPlan node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    ParentNodeFinderForExecPlan finder = new ParentNodeFinderForExecPlan(type, node);
+    finder.find();
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
   public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
     Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 0401718..26cedd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -38,23 +38,31 @@ public class DataChannel {
 
   private StoreType storeType = StoreType.CSV;
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+  private Integer srcPID;
+  private Integer targetPID;
+
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
     this.srcId = srcId;
     this.targetId = targetId;
+    this.srcPID = srcPID;
+    this.targetPID = targetPID;
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
-    this(srcId, targetId);
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+                     PartitionType partitionType) {
+    this(srcId, targetId, srcPID, targetPID);
     this.partitionType = partitionType;
   }
 
-  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
-    this(src.getId(), target.getId(), partitionType, partNum);
-    setSchema(src.getPlan().getOutSchema());
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, Integer srcPID, Integer targetPID,
+                     PartitionType partitionType, int partNum, Schema schema) {
+    this(src.getId(), target.getId(), srcPID, targetPID, partitionType, partNum);
+    setSchema(schema);
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
-    this(srcId, targetId, partitionType);
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+                     PartitionType partitionType, int partNum) {
+    this(srcId, targetId, srcPID, targetPID, partitionType);
     this.partitionNum = partNum;
   }
 
@@ -77,6 +85,12 @@ public class DataChannel {
     if (proto.hasPartitionNum()) {
       this.partitionNum = proto.getPartitionNum();
     }
+    if (proto.hasSrcPID()) {
+      this.srcPID = proto.getSrcPID();
+    }
+    if (proto.hasTargetPID()) {
+      this.targetPID = proto.getTargetPID();
+    }
   }
 
   public ExecutionBlockId getSrcId() {
@@ -163,6 +177,12 @@ public class DataChannel {
     if (partitionNum != null) {
       builder.setPartitionNum(partitionNum);
     }
+    if (srcPID != null) {
+      builder.setSrcPID(srcPID);
+    }
+    if (targetPID != null) {
+      builder.setTargetPID(targetPID);
+    }
     return builder.build();
   }
 
@@ -177,7 +197,7 @@ public class DataChannel {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("[").append(srcId.getQueryId()).append("] ");
-    sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+    sb.append(srcId.getId()).append("."+srcPID).append(" => ").append(targetId.getId()).append("."+targetPID);
     sb.append(" (type=").append(partitionType);
     if (hasPartitionKey()) {
       sb.append(", key=");
@@ -195,4 +215,16 @@ public class DataChannel {
     sb.append(")");
     return sb.toString();
   }
+
+  public void updateSrcPID(int srcPID) {
+    this.srcPID = srcPID;
+  }
+
+  public Integer getSrcPID() {
+    return srcPID;
+  }
+
+  public Integer getTargetPID() {
+    return targetPID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
new file mode 100644
index 0000000..3c2152c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+
+public class DestinationContext {
+  @Expose private String destTableName;
+  @Expose private StoreType storeType = StoreType.CSV;
+  @Expose private NodeType terminalNodeType;
+  @Expose private Schema outputSchema;
+  @Expose private Column[] groupingColumns;
+  @Expose private SortSpec[] sortSpecs;
+
+  public DestinationContext() {
+
+  }
+
+  public DestinationContext(LogicalNode node) {
+    this.set(node);
+  }
+
+  public void set(LogicalNode node) {
+    terminalNodeType = node.getType();
+    outputSchema = node.getOutSchema();
+    if (terminalNodeType.equals(NodeType.GROUP_BY)) {
+      groupingColumns = ((GroupbyNode)node).getGroupingColumns();
+    } else if (terminalNodeType.equals(NodeType.SORT)) {
+      sortSpecs = ((SortNode)node).getSortKeys();
+    }
+  }
+
+  public NodeType getTerminalNodeType() {
+    return terminalNodeType;
+  }
+
+  public void setTerminalNodeType(NodeType terminalNodeType) {
+    this.terminalNodeType = terminalNodeType;
+  }
+
+  public Schema getOutputSchema() {
+    return outputSchema;
+  }
+
+  public void setOutputSchema(Schema outputSchema) {
+    this.outputSchema = outputSchema;
+  }
+
+  public Column [] getGroupingColumns() {
+    return groupingColumns;
+  }
+
+  public void setGroupingColumns(Column [] groupingColumns) {
+    this.groupingColumns = groupingColumns;
+  }
+
+  public SortSpec[] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public void setSortSpecs(SortSpec[] sortSpecs) {
+    this.sortSpecs = sortSpecs;
+  }
+
+  public StoreType getStoreType() {
+    return storeType;
+  }
+
+  public void setStoreType(StoreType storeType) {
+    this.storeType = storeType;
+  }
+
+  public String getDestTableName() {
+    return destTableName;
+  }
+
+  public void setDestTableName(String destTableName) {
+    this.destTableName = destTableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index efa1c7f..7cfa478 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -14,12 +14,15 @@
 
 package org.apache.tajo.engine.planner.global;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@ -30,16 +33,17 @@ import java.util.*;
  */
 public class ExecutionBlock {
   private ExecutionBlockId executionBlockId;
-  private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+  private ExecutionPlan executionPlan;
   private Enforcer enforcer = new Enforcer();
 
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
-
   private Set<String> broadcasted = new HashSet<String>();
 
+  public ExecutionBlock(ExecutionBlockId executionBlockId, LogicalRootNode rootNode) {
+    this.executionBlockId = executionBlockId;
+    this.executionPlan = new ExecutionPlan(rootNode);
+  }
+
+  @VisibleForTesting
   public ExecutionBlock(ExecutionBlockId executionBlockId) {
     this.executionBlockId = executionBlockId;
   }
@@ -49,64 +53,27 @@ public class ExecutionBlock {
   }
 
   public void setPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    hasUnionPlan = false;
-    this.scanlist.clear();
-    this.plan = plan;
-
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getChild());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == NodeType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == NodeType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getLeftChild());
-        s.add(s.size(), binary.getRightChild());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      } else if (node instanceof TableSubQueryNode) {
-        TableSubQueryNode subQuery = (TableSubQueryNode) node;
-        s.add(s.size(), subQuery.getSubQuery());
-      }
-    }
+    executionPlan.setPlan(plan);
   }
 
-
-  public LogicalNode getPlan() {
-    return plan;
+  public ExecutionPlan getPlan() {
+    return executionPlan;
   }
 
   public Enforcer getEnforcer() {
     return enforcer;
   }
 
-  public StoreTableNode getStoreTableNode() {
-    return store;
-  }
-
-  public ScanNode [] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
-  }
-
-  public Schema getOutputSchema() {
-    return store.getOutSchema();
+  public InputContext getInputContext() {
+    return executionPlan.getInputContext();
   }
 
   public boolean hasJoin() {
-    return hasJoinPlan;
+    return executionPlan.hasJoinPlan();
   }
 
   public boolean hasUnion() {
-    return hasUnionPlan;
+    return executionPlan.hasUnionPlan();
   }
 
   public void addBroadcastTables(Collection<String> tableNames) {