You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:33 UTC

[6/7] TAJO-184: Refactor GlobalPlanner and global plan data structure. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index e9ed087..351b6fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Stack;
 
 import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
 
 /**
  * This class creates a logical plan from a parse tree ({@link org.apache.tajo.engine.parser.SQLAnalyzer})
@@ -135,6 +136,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
   }
 
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+    QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
+    PlanContext newContext = new PlanContext(context.plan, newBlock);
+    Stack<OpType> newStack = new Stack<OpType>();
+    LogicalNode child = visitChild(newContext, newStack, expr.getSubQuery());
+    context.plan.connectBlocks(newContext.block, context.block, BlockType.TableSubQuery);
+    return new TableSubQueryNode(expr.getName(), child);
+  }
+
+
   @Override
   public ScanNode visitRelation(PlanContext context, Stack<OpType> stack, Relation expr)
       throws VerifyException {
@@ -307,22 +318,26 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     PlanContext leftContext = new PlanContext(plan, plan.newAnonymousBlock());
     Stack<OpType> leftStack = new Stack<OpType>();
     LogicalNode left = visitChild(leftContext, leftStack, setOperation.getLeft());
+    TableSubQueryNode leftSubQuery = new TableSubQueryNode(leftContext.block.getName(), left);
+    context.plan.connectBlocks(leftContext.block, context.block, BlockType.TableSubQuery);
 
     PlanContext rightContext = new PlanContext(plan, plan.newAnonymousBlock());
     Stack<OpType> rightStack = new Stack<OpType>();
     LogicalNode right = visitChild(rightContext, rightStack, setOperation.getRight());
+    TableSubQueryNode rightSubQuery = new TableSubQueryNode(rightContext.block.getName(), right);
+    context.plan.connectBlocks(rightContext.block, context.block, BlockType.TableSubQuery);
 
     verifySetStatement(setOperation.getType(), leftContext.block, rightContext.block);
 
     BinaryNode setOp;
     if (setOperation.getType() == OpType.Union) {
-      setOp = new UnionNode(left, right);
+      setOp = new UnionNode(leftSubQuery, rightSubQuery);
     } else if (setOperation.getType() == OpType.Except) {
-      setOp = new ExceptNode(left, right);
+      setOp = new ExceptNode(leftSubQuery, rightSubQuery);
     } else if (setOperation.getType() == OpType.Intersect) {
-      setOp = new IntersectNode(left, right);
+      setOp = new IntersectNode(leftSubQuery, rightSubQuery);
     } else {
-      throw new VerifyException(setOperation.toJson());
+      throw new VerifyException("Invalid Type: " + setOperation.getType());
     }
 
     // Strip the table names from the targets of the both blocks
@@ -330,10 +345,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     Target [] leftStrippedTargets = PlannerUtil.stripTarget(leftContext.block.getCurrentTargets());
 
     Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
-    setOp.setInSchema(left.getOutSchema());
+    setOp.setInSchema(leftSubQuery.getOutSchema());
     setOp.setOutSchema(outSchema);
-    setOp.setLeftChild(left);
-    setOp.setRightChild(right);
 
     if (isNoUpperProjection(stack)) {
       block.targetListManager = new TargetListManager(plan, leftStrippedTargets);
@@ -357,7 +370,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     for (int i = 0; i < targets1.length; i++) {
       if (!targets1[i].getDataType().equals(targets2[i].getDataType())) {
-        throw new VerifyException("UNION types " + targets1[i].getDataType().getType() + " and "
+        throw new VerifyException(type + " types " + targets1[i].getDataType().getType() + " and "
             + targets2[i].getDataType().getType() + " cannot be matched");
       }
     }
@@ -428,7 +441,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       // 4. Set Child Plan and Update Input Schemes Phase
       groupingNode.setChild(child);
       block.setGroupingNode(groupingNode);
-      groupingNode.setInSchema(child.getInSchema());
+      groupingNode.setInSchema(child.getOutSchema());
 
       // 5. Update Output Schema and Targets for Upper Plan
 
@@ -438,7 +451,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       List<Column[]> cuboids  = generateCuboids(annotateGroupingColumn(plan, block.getName(),
           groupElements[0].getColumns(), child));
       UnionNode topUnion = createGroupByUnion(plan, block, child, cuboids, 0);
-      block.resolveGrouping();
+      block.needToResolveGrouping();
       block.getTargetListManager().setEvaluatedAll();
 
       return topUnion;
@@ -827,7 +840,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     PlanContext newContext = new PlanContext(ctx.plan, newQueryBlock);
     Stack<OpType> subStack = new Stack<OpType>();
     LogicalNode subQuery = visitChild(newContext, subStack, expr.getSubQuery());
-    ctx.plan.connectBlocks(ctx.block, newQueryBlock, QueryBlockGraph.BlockType.TableSubQuery);
+    ctx.plan.connectBlocks(newQueryBlock, ctx.block, BlockType.TableSubQuery);
     stack.pop();
 
     InsertNode insertNode = null;
@@ -849,7 +862,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         }
       }
 
-      checkInsertDomains(targetSchema, subQuery.getOutSchema());
+      ensureDomains(targetSchema, subQuery.getOutSchema());
       insertNode = new InsertNode(desc, subQuery);
       insertNode.setTargetSchema(targetSchema);
       insertNode.setOutSchema(targetSchema);
@@ -872,12 +885,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return insertNode;
   }
 
-  private static void checkInsertDomains(Schema targetTableScheme, Schema insertSchema)
+  /**
+   * This ensures that corresponding columns in both tables are equivalent to each other.
+   */
+  private static void ensureDomains(Schema targetTableScheme, Schema schema)
       throws PlanningException {
-    for (int i = 0; i < insertSchema.getColumnNum(); i++) {
-      if (!insertSchema.getColumn(i).getDataType().equals(targetTableScheme.getColumn(i).getDataType())) {
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      if (!schema.getColumn(i).getDataType().equals(targetTableScheme.getColumn(i).getDataType())) {
         Column targetColumn = targetTableScheme.getColumn(i);
-        Column insertColumn = insertSchema.getColumn(i);
+        Column insertColumn = schema.getColumn(i);
         throw new PlanningException("ERROR: " +
             insertColumn.getColumnName() + " is of type " + insertColumn.getDataType().getType().name() +
                 ", but target column '" + targetColumn.getColumnName() + "' is of type " +

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 7d68e19..365731b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -25,9 +25,11 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
@@ -39,6 +41,8 @@ import org.apache.tajo.util.IndexUtil;
 
 import java.io.IOException;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
   protected final TajoConf conf;
@@ -49,19 +53,42 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     this.sm = sm;
   }
 
-  public PhysicalExec createPlan(final TaskAttemptContext context,
-      final LogicalNode logicalPlan) throws InternalException {
+  public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
+      throws InternalException {
 
-    PhysicalExec plan;
+    PhysicalExec execPlan;
 
     try {
-      plan = createPlanRecursive(context, logicalPlan);
-
+      execPlan = createPlanRecursive(context, logicalPlan);
+      if (execPlan instanceof StoreTableExec || execPlan instanceof IndexedStoreExec
+          || execPlan instanceof PartitionedStoreExec) {
+        return execPlan;
+      } else if (context.getDataChannel() != null) {
+        return buildOutputOperator(context, logicalPlan, execPlan);
+      } else {
+        return execPlan;
+      }
     } catch (IOException ioe) {
       throw new InternalException(ioe);
     }
+  }
 
-    return plan;
+  private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
+                                           PhysicalExec execPlan) throws IOException {
+    DataChannel channel = context.getDataChannel();
+    StoreTableNode storeTableNode = new StoreTableNode(channel.getTargetId().toString());
+    storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+    storeTableNode.setInSchema(execPlan.getSchema());
+    storeTableNode.setOutSchema(execPlan.getSchema());
+    if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+      storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
+    } else {
+      storeTableNode.setDefaultParition();
+    }
+    storeTableNode.setChild(plan);
+
+    PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
+    return outExecPlan;
   }
 
   private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
@@ -93,7 +120,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         outer = createPlanRecursive(ctx, prjNode.getChild());
         return new ProjectionExec(ctx, prjNode, outer);
 
-      case SCAN:
+      case TABLE_SUBQUERY: {
+        TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
+        outer = createPlanRecursive(ctx, subQueryNode.getSubQuery());
+        return outer;
+
+      } case SCAN:
         outer = createScanPlan(ctx, (ScanNode) logicalNode);
         return outer;
 
@@ -202,17 +234,20 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public PhysicalExec createStorePlan(TaskAttemptContext ctx,
                                       StoreTableNode plan, PhysicalExec subOp) throws IOException {
-    if (plan.hasPartitionKey()) {
-      switch (plan.getPartitionType()) {
-        case HASH:
+    if (plan.getPartitionType() == PartitionType.HASH_PARTITION
+        || plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
+      switch (ctx.getDataChannel().getPartitionType()) {
+        case HASH_PARTITION:
           return new PartitionedStoreExec(ctx, sm, plan, subOp);
 
-        case RANGE:
+        case RANGE_PARTITION:
+          SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
+
           SortSpec [] sortSpecs = null;
-          if (subOp instanceof SortExec) {
-            sortSpecs = ((SortExec)subOp).getSortSpecs();
+          if (sortExec != null) {
+            sortSpecs = sortExec.getSortSpecs();
           } else {
-            Column[] columns = plan.getPartitionKeys();
+            Column[] columns = ctx.getDataChannel().getPartitionKey();
             SortSpec specs[] = new SortSpec[columns.length];
             for (int i = 0; i < columns.length; i++) {
               specs[i] = new SortSpec(columns[i]);
@@ -232,10 +267,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
       throws IOException {
-    Preconditions.checkNotNull(ctx.getTable(scanNode.getTableId()),
-        "Error: There is no table matched to %s", scanNode.getTableId());
+    Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
+        "Error: There is no table matched to %s", scanNode.getTableName());
 
-    Fragment[] fragments = ctx.getTables(scanNode.getTableId());
+    Fragment[] fragments = ctx.getTables(scanNode.getTableName());
     return new SeqScanExec(ctx, sm, scanNode, fragments);
   }
 
@@ -281,14 +316,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                           IndexScanNode annotation)
       throws IOException {
     //TODO-general Type Index
-    Preconditions.checkNotNull(ctx.getTable(annotation.getTableId()),
-        "Error: There is no table matched to %s", annotation.getTableId());
+    Preconditions.checkNotNull(ctx.getTable(annotation.getTableName()),
+        "Error: There is no table matched to %s", annotation.getTableName());
 
-    Fragment[] fragments = ctx.getTables(annotation.getTableId());
+    Fragment[] fragments = ctx.getTables(annotation.getTableName());
 
     String indexName = IndexUtil.getIndexNameOfFrag(fragments[0],
         annotation.getSortKeys());
-    Path indexPath = new Path(sm.getTablePath(annotation.getTableId()), "index");
+    Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
 
     TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
new file mode 100644
index 0000000..ef8bed0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanString.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class PlanString {
+  final StringBuilder title;
+
+  final List<String> explanations = new ArrayList<String>();
+  final List<String> details = new ArrayList<String>();
+
+  StringBuilder currentExplanation;
+  StringBuilder currentDetail;
+
+  public PlanString(String title) {
+    this.title = new StringBuilder(title);
+  }
+
+  public PlanString appendTitle(String str) {
+    title.append(str);
+    return this;
+  }
+
+  public PlanString addExplan(String explain) {
+    flushCurrentExplanation();
+    currentExplanation = new StringBuilder(explain);
+    return this;
+  }
+
+  public PlanString appendExplain(String explain) {
+    if (currentExplanation == null) {
+      currentExplanation = new StringBuilder();
+    }
+    currentExplanation.append(explain);
+    return this;
+  }
+
+  public PlanString addDetail(String detail) {
+    flushCurrentDetail();
+    currentDetail = new StringBuilder(detail);
+    return this;
+  }
+
+  public PlanString appendDetail(String detail) {
+    if (currentDetail == null) {
+      currentDetail = new StringBuilder();
+    }
+    currentDetail.append(detail);
+    return this;
+
+  }
+
+  public String getTitle() {
+    return title.toString();
+  }
+
+  public List<String> getExplanations() {
+    flushCurrentExplanation();
+    return explanations;
+  }
+
+  public List<String> getDetails() {
+    flushCurrentDetail();
+    return details;
+  }
+
+  private void flushCurrentExplanation() {
+    if (currentExplanation != null) {
+      explanations.add(currentExplanation.toString());
+      currentExplanation = null;
+    }
+  }
+
+  private void flushCurrentDetail() {
+    if (currentDetail != null) {
+      details.add(currentDetail.toString());
+      currentDetail = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3f686af..a06b07e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -61,7 +61,7 @@ public class PlannerUtil {
     ScanNode scan;
     for (int i = 0; i < scans.length; i++) {
       scan = (ScanNode) scans[i];
-      tableNames[i] = scan.getTableId();
+      tableNames[i] = scan.getTableName();
     }
     return tableNames;
   }
@@ -127,65 +127,16 @@ public class PlannerUtil {
     parentNode.setChild(newNode);
   }
   
-  public static LogicalNode insertOuterNode(LogicalNode parent, LogicalNode outer) {
-    Preconditions.checkArgument(parent instanceof BinaryNode);
-    Preconditions.checkArgument(outer instanceof UnaryNode);
-    
-    BinaryNode p = (BinaryNode) parent;
-    LogicalNode c = p.getLeftChild();
-    UnaryNode m = (UnaryNode) outer;
-    m.setInSchema(c.getOutSchema());
-    m.setOutSchema(c.getOutSchema());
-    m.setChild(c);
-    p.setLeftChild(m);
-    return p;
-  }
-  
-  public static LogicalNode insertInnerNode(LogicalNode parent, LogicalNode inner) {
-    Preconditions.checkArgument(parent instanceof BinaryNode);
-    Preconditions.checkArgument(inner instanceof UnaryNode);
-    
-    BinaryNode p = (BinaryNode) parent;
-    LogicalNode c = p.getRightChild();
-    UnaryNode m = (UnaryNode) inner;
-    m.setInSchema(c.getOutSchema());
-    m.setOutSchema(c.getOutSchema());
-    m.setChild(c);
-    p.setRightChild(m);
-    return p;
-  }
-  
-  public static LogicalNode insertNode(LogicalNode parent, 
-      LogicalNode left, LogicalNode right) {
-    Preconditions.checkArgument(parent instanceof BinaryNode);
-    Preconditions.checkArgument(left instanceof UnaryNode);
-    Preconditions.checkArgument(right instanceof UnaryNode);
-    
-    BinaryNode p = (BinaryNode)parent;
-    LogicalNode lc = p.getLeftChild();
-    LogicalNode rc = p.getRightChild();
-    UnaryNode lm = (UnaryNode)left;
-    UnaryNode rm = (UnaryNode)right;
-    lm.setInSchema(lc.getOutSchema());
-    lm.setOutSchema(lc.getOutSchema());
-    lm.setChild(lc);
-    rm.setInSchema(rc.getOutSchema());
-    rm.setOutSchema(rc.getOutSchema());
-    rm.setChild(rc);
-    p.setLeftChild(lm);
-    p.setRightChild(rm);
-    return p;
-  }
-  
-  public static LogicalNode transformGroupbyTo2P(GroupbyNode gp) {
-    Preconditions.checkNotNull(gp);
-        
+  public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
+    Preconditions.checkNotNull(groupBy);
+
+    GroupbyNode child = null;
     try {
       // cloning groupby node
-      GroupbyNode child = (GroupbyNode) gp.clone();
+      child = (GroupbyNode) groupBy.clone();
 
       List<Target> newChildTargets = Lists.newArrayList();
-      Target[] secondTargets = gp.getTargets();
+      Target[] secondTargets = groupBy.getTargets();
       Target[] firstTargets = child.getTargets();
 
       Target second;
@@ -231,45 +182,17 @@ public class PlannerUtil {
       child.setTargets(targetArray);
       child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
       // set the groupby chaining
-      gp.setChild(child);
-      gp.setInSchema(child.getOutSchema());
+      groupBy.setChild(child);
+      groupBy.setInSchema(child.getOutSchema());
     } catch (CloneNotSupportedException e) {
       LOG.error(e);
     }
     
-    return gp;
-  }
-  
-  public static LogicalNode transformSortTo2P(SortNode sort) {
-    Preconditions.checkNotNull(sort);
-    
-    try {
-      SortNode child = (SortNode) sort.clone();
-      sort.setChild(child);
-      sort.setInSchema(child.getOutSchema());
-      sort.setOutSchema(child.getOutSchema());
-    } catch (CloneNotSupportedException e) {
-      LOG.error(e);
-    }
-    return sort;
-  }
-  
-  public static LogicalNode transformGroupbyTo2PWithStore(GroupbyNode gb, 
-      String tableId) {
-    GroupbyNode groupby = (GroupbyNode) transformGroupbyTo2P(gb);
-    return insertStore(groupby, tableId);
-  }
-  
-  public static LogicalNode transformSortTo2PWithStore(SortNode sort, 
-      String tableId) {
-    SortNode sort2p = (SortNode) transformSortTo2P(sort);
-    return insertStore(sort2p, tableId);
+    return child;
   }
   
-  private static LogicalNode insertStore(LogicalNode parent, 
-      String tableId) {
-    StoreTableNode store = new StoreTableNode(tableId);
-    store.setLocal(true);
+  private static LogicalNode insertStore(LogicalNode parent, String tableName) {
+    StoreTableNode store = new StoreTableNode(tableName);
     insertNode(parent, store);
     
     return parent;
@@ -282,7 +205,7 @@ public class PlannerUtil {
    * @param type to find
    * @return a found logical node
    */
-  public static LogicalNode findTopNode(LogicalNode node, NodeType type) {
+  public static <T extends LogicalNode> T findTopNode(LogicalNode node, NodeType type) {
     Preconditions.checkNotNull(node);
     Preconditions.checkNotNull(type);
     
@@ -292,7 +215,7 @@ public class PlannerUtil {
     if (finder.getFoundNodes().size() == 0) {
       return null;
     }
-    return finder.getFoundNodes().get(0);
+    return (T) finder.getFoundNodes().get(0);
   }
 
   /**
@@ -344,8 +267,8 @@ public class PlannerUtil {
       Set<String> tableIds = Sets.newHashSet();
       // getting distinct table references
       for (Column col : columnRefs) {
-        if (!tableIds.contains(col.getTableName())) {
-          tableIds.add(col.getTableName());
+        if (!tableIds.contains(col.getQualifier())) {
+          tableIds.add(col.getQualifier());
         }
       }
 
@@ -373,11 +296,12 @@ public class PlannerUtil {
       return i.contains(it.next()) && o.contains(it.next());
 
     } else {
-      if (node instanceof ScanNode) {
-        ScanNode scan = (ScanNode) node;
+      if (node instanceof RelationNode) {
+
+        RelationNode scan = (RelationNode) node;
 
         for (Column col : columnRefs) {
-          if (scan.getTableId().equals(col.getTableName())) {
+          if (scan.getTableName().equals(col.getQualifier())) {
             Column found = node.getInSchema().getColumnByName(col.getColumnName());
             if (found == null) {
               return false;
@@ -461,12 +385,6 @@ public class PlannerUtil {
     }
   }
   
-  public static Set<Column> collectColumnRefs(LogicalNode node) {
-    ColumnRefCollector collector = new ColumnRefCollector();
-    node.postOrder(collector);
-    return collector.getColumns();
-  }
-  
   private static class ColumnRefCollector implements LogicalNodeVisitor {
     private Set<Column> collected = Sets.newHashSet();
     
@@ -609,7 +527,7 @@ public class PlannerUtil {
       List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
 
       if (left.size() == 1 && right.size() == 1 &&
-          !left.get(0).getTableName().equals(right.get(0).getTableName()))
+          !left.get(0).getQualifier().equals(right.get(0).getQualifier()))
         return true;
     }
 
@@ -717,7 +635,7 @@ public class PlannerUtil {
       }
       if (copy[i].getEvalTree().getType() == EvalType.FIELD) {
         FieldEval fieldEval = (FieldEval) copy[i].getEvalTree();
-        if (fieldEval.getColumnRef().isQualified()) {
+        if (fieldEval.getColumnRef().hasQualifier()) {
           fieldEval.getColumnRef().setName(fieldEval.getColumnName());
         }
       }
@@ -725,4 +643,19 @@ public class PlannerUtil {
 
     return copy;
   }
+
+  public static <T extends LogicalNode> T clone(LogicalNode node) {
+    try {
+      return (T) node.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Schema getQualifiedSchema(Schema targetSchema, String qualifier) {
+    Schema copied;
+    copied = (Schema) targetSchema.clone();
+    copied.setQualifier(qualifier);
+    return copied;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
index 543bbd7..12e4978 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -65,14 +65,14 @@ public class Projector {
             continue outer;
         }
 
-        col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+        col = inSchema.getColumnByFQN(outSchema.getColumn(targetId).getQualifiedName());
         outMap[mapId] = targetId;
         inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
         mapId++;
       }
     } else {
       for (int targetId = 0; targetId < outSchema.getColumnNum(); targetId ++) {
-        col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+        col = inSchema.getColumnByFQN(outSchema.getColumn(targetId).getQualifiedName());
         outMap[mapId] = targetId;
         inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
         mapId++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/QueryBlockGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/QueryBlockGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/QueryBlockGraph.java
deleted file mode 100644
index 8677ba8..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/QueryBlockGraph.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-import org.apache.tajo.util.TUtil;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * It provides a graph data structure for query blocks and their relations.
- */
-public class QueryBlockGraph {
-  private Map<String, List<BlockEdge>> blockEdges = new HashMap<String, List<BlockEdge>>();
-
-  public QueryBlockGraph() {
-  }
-
-  public int size() {
-    return blockEdges.size();
-  }
-
-  public void connectBlocks(String srcBlock, String targetBlock, BlockType type) {
-    BlockEdge newBlockEdge = new BlockEdge(targetBlock, type);
-    if (blockEdges.containsKey(srcBlock)) {
-      blockEdges.get(srcBlock).add(newBlockEdge);
-    } else {
-      blockEdges.put(srcBlock, TUtil.newList(newBlockEdge));
-    }
-  }
-
-  public Collection<BlockEdge> getBlockEdges(String srcBlock) {
-    return blockEdges.get(srcBlock);
-  }
-
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, List<BlockEdge>> entry : blockEdges.entrySet()) {
-      sb.append(entry.getKey()).append("\n");
-      for (BlockEdge edge : entry.getValue()) {
-        sb.append(" |- ");
-        sb.append(edge.getTargetBlock()).append(" (").append(edge.getBlockType()).append(")");
-      }
-      sb.append("\n");
-    }
-
-    return sb.toString();
-  }
-
-  public static enum BlockType {
-    TableSubQuery,
-    ScalarSubQuery
-  }
-
-  public static class BlockEdge {
-    private String targetBlock;
-    private BlockType blockType;
-
-    public BlockEdge(String targetBlock, BlockType blockType) {
-      this.targetBlock = targetBlock;
-      this.blockType = blockType;
-    }
-
-    public String getTargetBlock() {
-      return targetBlock;
-    }
-
-    public BlockType getBlockType() {
-      return blockType;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index c14fe88..670cbfb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -38,8 +38,7 @@ public class Target implements Cloneable, GsonObject {
 
   public Target(EvalNode expr) {
     this.expr = expr;
-    if (expr.getType() == EvalType.AGG_FUNCTION &&
-        expr.getValueType().length > 1) { // hack for partial result
+    if (expr.getType() == EvalType.AGG_FUNCTION && expr.getValueType().length > 1) { // hack for partial result
       this.column = new Column(expr.getName(), Type.ARRAY);
     } else {
       this.column = new Column(expr.getName(), expr.getValueType()[0]);
@@ -48,11 +47,16 @@ public class Target implements Cloneable, GsonObject {
 
   public Target(final EvalNode eval, final String alias) {
     this(eval);
-    this.alias = alias;
+    setAlias(alias);
+  }
+
+  public String getCanonicalName() {
+    return !hasAlias() ? column.getQualifiedName() : alias;
   }
 
   public final void setAlias(String alias) {
     this.alias = alias;
+    this.column = new Column(alias, expr.getValueType()[0]);
   }
 
   public final String getAlias() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
deleted file mode 100644
index 36190ef..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.global;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.UnaryNode;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
-
-public class GlobalOptimizer {
-
-  public GlobalOptimizer() {
-    
-  }
-  
-  public MasterPlan optimize(MasterPlan plan) {
-    ExecutionBlock reducedStep = reduceSchedules(plan.getRoot());
-
-    MasterPlan optimized = new MasterPlan(reducedStep);
-    optimized.setOutputTableName(plan.getOutputTable());
-
-    return optimized;
-  }
-  
-  @VisibleForTesting
-  private ExecutionBlock reduceSchedules(ExecutionBlock logicalUnit) {
-    reduceLogicalQueryUnitStep_(logicalUnit);
-    return logicalUnit;
-  }
-
-  private void reduceLogicalQueryUnitStep_(ExecutionBlock cur) {
-    if (cur.hasChildBlock()) {
-      for (ExecutionBlock childBlock: cur.getChildBlocks())
-        reduceLogicalQueryUnitStep_(childBlock);
-    }
-
-    for (ExecutionBlock childBlock: cur.getChildBlocks()) {
-      if (childBlock.getStoreTableNode().getChild().getType() != NodeType.UNION &&
-          childBlock.getPartitionType() == PartitionType.LIST) {
-        mergeLogicalUnits(cur, childBlock);
-      }
-    }
-  }
-  
-  private ExecutionBlock mergeLogicalUnits(ExecutionBlock parent, ExecutionBlock child) {
-    LogicalNode p = PlannerUtil.findTopParentNode(parent.getPlan(), NodeType.SCAN);
-
-    if (p instanceof UnaryNode) {
-      UnaryNode u = (UnaryNode) p;
-      ScanNode scan = (ScanNode) u.getChild();
-      LogicalNode c = child.getStoreTableNode().getChild();
-
-      parent.removeChildBlock(scan);
-      u.setChild(c);
-      parent.setPlan(parent.getPlan());
-      parent.addChildBlocks(child.getChildBlockMap());
-    }
-    return parent;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 9201997..7b6caa9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -21,17 +21,71 @@
  */
 package org.apache.tajo.engine.planner.global;
 
+import org.apache.tajo.DataChannel;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlockCursor;
+import org.apache.tajo.master.QueryContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
 
 public class MasterPlan {
+  private final QueryId queryId;
+  private final QueryContext context;
+  private final LogicalPlan plan;
   private ExecutionBlock root;
-  private String outputTableName;
+  private AtomicInteger nextId = new AtomicInteger(0);
+
+  private ExecutionBlock terminalBlock;
+  private Map<ExecutionBlockId, ExecutionBlock> execBlockMap = new HashMap<ExecutionBlockId, ExecutionBlock>();
+  private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
+      new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
+
+  public ExecutionBlockId newExecutionBlockId() {
+    return new ExecutionBlockId(queryId, nextId.incrementAndGet());
+  }
+
+  public boolean isTerminal(ExecutionBlock execBlock) {
+    return terminalBlock == execBlock;
+  }
+
+  public ExecutionBlock getTerminalBlock() {
+    return terminalBlock;
+  }
+
+  public ExecutionBlock createTerminalBlock() {
+    terminalBlock = newExecutionBlock();
+    return terminalBlock;
+  }
+
+  public MasterPlan(QueryId queryId, QueryContext context, LogicalPlan plan) {
+    this.queryId = queryId;
+    this.context = context;
+    this.plan = plan;
+  }
+
+  public QueryId getQueryId() {
+    return this.queryId;
+  }
+
+  public QueryContext getContext() {
+    return this.context;
+  }
 
-  public MasterPlan(ExecutionBlock root) {
-    setRoot(root);
+  public LogicalPlan getLogicalPlan() {
+    return this.plan;
   }
   
-  public void setRoot(ExecutionBlock root) {
+  public void setTerminal(ExecutionBlock root) {
     this.root = root;
   }
   
@@ -39,11 +93,158 @@ public class MasterPlan {
     return this.root;
   }
 
-  public void setOutputTableName(String tableName) {
-    this.outputTableName = tableName;
+  public ExecutionBlock newExecutionBlock() {
+    ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId());
+    execBlockMap.put(newExecBlock.getId(), newExecBlock);
+    return newExecBlock;
+  }
+
+  public boolean containsExecBlock(ExecutionBlockId execBlockId) {
+    return execBlockMap.containsKey(execBlockId);
+  }
+
+  public ExecutionBlock getExecBlock(ExecutionBlockId execBlockId) {
+    return execBlockMap.get(execBlockId);
+  }
+
+  public void addConnect(DataChannel dataChannel) {
+    execBlockGraph.connect(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
+  }
+
+  public void addConnect(ExecutionBlock src, ExecutionBlock target, PartitionType type) {
+    addConnect(src.getId(), target.getId(), type);
+  }
+
+  public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
+    addConnect(new DataChannel(src, target, type));
+  }
+
+  public boolean isConnected(ExecutionBlock src, ExecutionBlock target) {
+    return isConnected(src.getId(), target.getId());
+  }
+
+  public boolean isConnected(ExecutionBlockId src, ExecutionBlockId target) {
+    return execBlockGraph.isConnected(src, target);
+  }
+
+  public boolean isReverseConnected(ExecutionBlock target, ExecutionBlock src) {
+    return execBlockGraph.isReversedConnected(target.getId(), src.getId());
+  }
+
+  public boolean isReverseConnected(ExecutionBlockId target, ExecutionBlockId src) {
+    return execBlockGraph.isReversedConnected(target, src);
+  }
+
+  public DataChannel getChannel(ExecutionBlock src, ExecutionBlock target) {
+    return execBlockGraph.getEdge(src.getId(), target.getId());
+  }
+
+  public DataChannel getChannel(ExecutionBlockId src, ExecutionBlockId target) {
+    return execBlockGraph.getEdge(src, target);
+  }
+
+  public List<DataChannel> getOutgoingChannels(ExecutionBlockId src) {
+    return execBlockGraph.getOutgoingEdges(src);
+  }
+
+  public boolean isRoot(ExecutionBlock execBlock) {
+    return execBlockGraph.isRoot(execBlock.getId());
+  }
+
+  public boolean isLeaf(ExecutionBlock execBlock) {
+    return execBlockGraph.isLeaf(execBlock.getId());
+  }
+
+  public boolean isLeaf(ExecutionBlockId id) {
+    return execBlockGraph.isLeaf(id);
+  }
+
+  public List<DataChannel> getIncomingChannels(ExecutionBlockId target) {
+    return execBlockGraph.getIncomingEdges(target);
+  }
+
+  public void disconnect(ExecutionBlock src, ExecutionBlock target) {
+    disconnect(src.getId(), target.getId());
   }
 
-  public String getOutputTable() {
-    return outputTableName;
+  public void disconnect(ExecutionBlockId src, ExecutionBlockId target) {
+    execBlockGraph.disconnect(src, target);
   }
+
+  public List<ExecutionBlock> getChilds(ExecutionBlock execBlock) {
+    return getChilds(execBlock.getId());
+  }
+
+  public List<ExecutionBlock> getChilds(ExecutionBlockId id) {
+    List<ExecutionBlock> childBlocks = new ArrayList<ExecutionBlock>();
+    for (ExecutionBlockId cid : execBlockGraph.getChilds(id)) {
+      childBlocks.add(execBlockMap.get(cid));
+    }
+    return childBlocks;
+  }
+
+  public int getChildCount(ExecutionBlockId blockId) {
+    return execBlockGraph.getChildCount(blockId);
+  }
+
+  public ExecutionBlock getChild(ExecutionBlockId execBlockId, int idx) {
+    return execBlockMap.get(execBlockGraph.getChild(execBlockId, idx));
+  }
+
+  public ExecutionBlock getChild(ExecutionBlock executionBlock, int idx) {
+    return getChild(executionBlock.getId(), idx);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(this);
+    sb.append("-------------------------------------------------------------------------------\n");
+    sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n");
+    sb.append("-------------------------------------------------------------------------------\n");
+    sb.append(execBlockGraph.toStringGraph(getRoot().getId()));
+    sb.append("-------------------------------------------------------------------------------\n");
+
+    while(cursor.hasNext()) {
+      ExecutionBlock block = cursor.nextBlock();
+
+      boolean terminal = false;
+      sb.append("\n");
+      sb.append("=======================================================\n");
+      sb.append("Block Id: " + block.getId());
+      if (isTerminal(block)) {
+        sb.append(" [TERMINAL]");
+        terminal = true;
+      } else if (isRoot(block)) {
+        sb.append(" [ROOT]");
+      } else if (isLeaf(block)) {
+        sb.append(" [LEAF]");
+      } else {
+        sb.append(" [INTERMEDIATE]");
+      }
+      sb.append("\n");
+      sb.append("=======================================================\n");
+      if (terminal) {
+        continue;
+      }
+
+      if (!isLeaf(block)) {
+        sb.append("\n[Incoming]\n");
+        for (DataChannel channel : getIncomingChannels(block.getId())) {
+          sb.append(channel).append("\n");
+        }
+      }
+      sb.append("\n[Outgoing]\n");
+      if (!isRoot(block)) {
+        for (DataChannel channel : getOutgoingChannels(block.getId())) {
+          sb.append(channel);
+          sb.append("\n");
+        }
+      }
+      sb.append("\n").append(block.getPlan());
+    }
+
+    return sb.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
new file mode 100644
index 0000000..df428bb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.graph;
+
+import java.util.List;
+
+/**
+ * This represents a directed graph.
+ *
+ * @param <V> The vertex class type
+ * @param <E> The edge class type
+ */
+public interface DirectedGraph<V, E> {
+
+  int size();
+
+  void connect(V tail, V head, E edge);
+
+  void disconnect(V tail, V head);
+
+  boolean isConnected(V tail, V head);
+
+  boolean isReversedConnected(V head, V tail);
+
+  E getEdge(V tail, V head);
+
+  E getReverseEdge(V head, V tail);
+
+  int getChildCount(V v);
+
+  List<E> getIncomingEdges(V head);
+
+  List<E> getOutgoingEdges(V tail);
+
+  List<V> getChilds(V v);
+
+  V getChild(V block, int idx);
+
+  V getParent(V v);
+
+  boolean isRoot(V v);
+
+  boolean isLeaf(V v);
+
+  /**
+   * It visits all vertices in a post-order traverse way.
+   */
+  void accept(V src, DirectedGraphVisitor<V> visitor);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraphVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraphVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraphVisitor.java
new file mode 100644
index 0000000..3deec7f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraphVisitor.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.graph;
+
+import java.util.Stack;
+
+public interface DirectedGraphVisitor<V> {
+  void visit(Stack<V> stack, V v);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
new file mode 100644
index 0000000..3bc360f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.graph;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+ * This represents a simple directed graph. It does not support multiple edges between both vertices.
+ *
+ * @param <V> The vertex class type
+ * @param <E> The edge class type
+ */
+public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
+  /** map: child -> parent */
+  private Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
+  /** map: parent -> child */
+  private Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
+
+  @Override
+  public int size() {
+    return directedEdges.size();
+  }
+
+  @Override
+  public void connect(V tail, V head, E edge) {
+    TUtil.putToNestedMap(directedEdges, tail, head, edge);
+    TUtil.putToNestedMap(reversedEdges, head, tail, edge);
+  }
+
+  @Override
+  public void disconnect(V tail, V head) {
+    if (directedEdges.containsKey(tail)) {
+      directedEdges.get(tail).remove(head);
+      if (directedEdges.get(tail).isEmpty()) {
+        directedEdges.remove(tail);
+      }
+
+      reversedEdges.get(head).remove(tail);
+      if (reversedEdges.get(head).isEmpty()) {
+        reversedEdges.remove(head);
+      }
+    } else {
+      throw new RuntimeException("Not connected channel: " + tail + " -> " + head);
+    }
+  }
+
+  @Override
+  public boolean isConnected(V tail, V head) {
+    return directedEdges.containsKey(tail) && directedEdges.get(tail).containsKey(head);
+  }
+
+  @Override
+  public boolean isReversedConnected(V head, V tail) {
+    return reversedEdges.containsKey(head) && reversedEdges.get(head).containsKey(tail);
+  }
+
+  @Override
+  public E getEdge(V tail, V head) {
+    if (isConnected(tail, head)) {
+      return directedEdges.get(tail).get(head);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public E getReverseEdge(V head, V tail) {
+    if (isReversedConnected(head, tail)) {
+      return reversedEdges.get(head).get(tail);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getChildCount(V v) {
+    if (reversedEdges.containsKey(v)) {
+      return reversedEdges.get(v).size();
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public List<E> getIncomingEdges(V head) {
+    if (reversedEdges.containsKey(head)) {
+      return ImmutableList.copyOf(reversedEdges.get(head).values());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public List<E> getOutgoingEdges(V tail) {
+    if (directedEdges.containsKey(tail)) {
+      return ImmutableList.copyOf(directedEdges.get(tail).values());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public List<V> getChilds(V v) {
+    List<V> childBlocks = new ArrayList<V>();
+    if (reversedEdges.containsKey(v)) {
+      for (Map.Entry<V, E> entry: reversedEdges.get(v).entrySet()) {
+        childBlocks.add(entry.getKey());
+      }
+    }
+    return childBlocks;
+  }
+
+  @Override
+  public V getChild(V block, int idx) {
+    return getChilds(block).get(idx);
+  }
+
+  @Override
+  public V getParent(V v) {
+    if (directedEdges.containsKey(v)) {
+      return directedEdges.get(v).keySet().iterator().next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean isRoot(V v) {
+    return !directedEdges.containsKey(v);
+  }
+
+  @Override
+  public boolean isLeaf(V v) {
+    return !reversedEdges.containsKey(v);
+  }
+
+  @Override
+  public void accept(V source, DirectedGraphVisitor<V> visitor) {
+    Stack<V> stack = new Stack<V>();
+    visitRecursive(stack, source, visitor);
+  }
+
+  private void visitRecursive(Stack<V> stack, V current, DirectedGraphVisitor<V> visitor) {
+    stack.push(current);
+    for (V child : getChilds(current)) {
+      visitRecursive(stack, child, visitor);
+    }
+    stack.pop();
+    visitor.visit(stack, current);
+  }
+
+  public String toString() {
+    return "G (|v| = " + directedEdges.size() +")";
+  }
+
+  public String printDepthString(DepthString planStr) {
+    StringBuilder output = new StringBuilder();
+    String pad = new String(new char[planStr.depth * 3]).replace('\0', ' ');
+    output.append(pad + "|-" + planStr.vertexStr).append("\n");
+
+    return output.toString();
+  }
+
+  public String toStringGraph(V vertex) {
+    StringBuilder sb = new StringBuilder();
+    QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder();
+    accept(vertex, visitor);
+    Stack<DepthString> depthStrings = visitor.getDepthStrings();
+    while(!depthStrings.isEmpty()) {
+      sb.append(printDepthString(depthStrings.pop()));
+    }
+    return sb.toString();
+  }
+
+  private class DepthString {
+    int depth;
+    String vertexStr;
+
+    DepthString(int depth, String vertexStr) {
+      this.depth = depth;
+      this.vertexStr = vertexStr;
+    }
+  }
+
+  private class QueryGraphTopologyStringBuilder implements DirectedGraphVisitor<V> {
+    Stack<DepthString> depthString = new Stack<DepthString>();
+
+    @Override
+    public void visit(Stack<V> stack, V vertex) {
+      depthString.push(new DepthString(stack.size(), vertex.toString()));
+    }
+
+    public Stack<DepthString> getDepthStrings() {
+      return depthString;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
index df4b046..e2724e7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
@@ -26,7 +26,7 @@ import org.apache.tajo.json.GsonObject;
 
 public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonObject {
 	@Expose LogicalNode leftChild = null;
-	@Expose LogicalNode inner = null;
+	@Expose LogicalNode rightChild = null;
 	
 	public BinaryNode() {
 		super();
@@ -39,27 +39,27 @@ public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonO
 		super(opType);
 	}
 	
-	public LogicalNode getLeftChild() {
-		return this.leftChild;
+	public <T extends LogicalNode> T getLeftChild() {
+		return (T) this.leftChild;
 	}
 	
 	public void setLeftChild(LogicalNode op) {
 		this.leftChild = op;
 	}
 
-	public LogicalNode getRightChild() {
-		return this.inner;
+	public <T extends LogicalNode> T getRightChild() {
+		return (T) this.rightChild;
 	}
 
 	public void setRightChild(LogicalNode op) {
-		this.inner = op;
+		this.rightChild = op;
 	}
 	
 	@Override
   public Object clone() throws CloneNotSupportedException {
 	  BinaryNode binNode = (BinaryNode) super.clone();
 	  binNode.leftChild = (LogicalNode) leftChild.clone();
-	  binNode.inner = (LogicalNode) inner.clone();
+	  binNode.rightChild = (LogicalNode) rightChild.clone();
 	  
 	  return binNode;
 	}
@@ -67,12 +67,12 @@ public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonO
 	public void preOrder(LogicalNodeVisitor visitor) {
 	  visitor.visit(this);
 	  leftChild.postOrder(visitor);
-    inner.postOrder(visitor);    
+    rightChild.postOrder(visitor);
   }
 	
 	public void postOrder(LogicalNodeVisitor visitor) {
     leftChild.postOrder(visitor);
-    inner.postOrder(visitor);
+    rightChild.postOrder(visitor);
     visitor.visit(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index c06623b..f7ff8ef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 public class CreateTableNode extends LogicalNode implements Cloneable {
@@ -88,6 +89,12 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   public void setExternal(boolean external) {
     this.external = external;
   }
+
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString("CreateTable");
+  }
   
   @Override
   public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
index 7ddb5ef..708c03a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.engine.planner.logical;
 
+import org.apache.tajo.engine.planner.PlanString;
+
 public class DropTableNode extends LogicalNode {
   private String tableName;
 
@@ -30,6 +32,11 @@ public class DropTableNode extends LogicalNode {
     return this.tableName;
   }
 
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString("DropTable");
+  }
+
   public boolean equals(Object obj) {
     if (obj instanceof DropTableNode) {
       DropTableNode other = (DropTableNode) obj;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
index 286f1c9..bb323dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
@@ -22,6 +22,7 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
@@ -89,4 +90,9 @@ public class EvalExprNode extends LogicalNode implements Projectable {
   public void postOrder(LogicalNodeVisitor visitor) {
     // nothing
   }
+
+  @Override
+  public PlanString getPlanString() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
index e7c7271..fd3c0d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -21,6 +21,8 @@
  */
 package org.apache.tajo.engine.planner.logical;
 
+import org.apache.tajo.engine.planner.PlanString;
+
 public class ExceptNode extends BinaryNode {
 
   public ExceptNode() {
@@ -33,6 +35,15 @@ public class ExceptNode extends BinaryNode {
     setRightChild(inner);
   }
 
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Except");
+    planStr.appendTitle(" (L - " + ((TableSubQueryNode)getLeftChild()).getTableName());
+    planStr.appendTitle(", R - " + ((TableSubQueryNode)getRightChild()).getTableName());
+    planStr.appendTitle(")");
+    return planStr;
+  }
+
   public String toString() {
     return getLeftChild().toString() + "\n EXCEPT \n" + getRightChild().toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index c6b9bc8..34da374 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.logical;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
@@ -43,6 +44,10 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     this(columns);
     this.havingCondition = havingCondition;
   }
+
+  public final boolean isEmptyGrouping() {
+    return columns == null || columns.length == 0;
+  }
 	
 	public final Column [] getGroupingColumns() {
 	  return this.columns;
@@ -143,4 +148,33 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     return grp;
   }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Aggregation");
+
+    StringBuilder sb = new StringBuilder("Targets: ");
+    for (int i = 0; i < targets.length; i++) {
+      sb.append(targets[i]);
+      if( i < targets.length - 1) {
+        sb.append(",");
+      }
+    }
+    planStr.addExplan(sb.toString());
+
+    sb = new StringBuilder("Groups: ");
+    sb.append("(");
+    Column [] groupingColumns = columns;
+    for (int j = 0; j < groupingColumns.length; j++) {
+      sb.append(groupingColumns[j].getColumnName());
+      if(j < groupingColumns.length - 1) {
+        sb.append(",");
+      }
+    }
+
+    sb.append(")");
+
+    planStr.addExplan(sb.toString());
+    return planStr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
index 8a9ffe4..0cfbfb5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
@@ -29,7 +29,6 @@ public class IndexScanNode extends ScanNode {
   @Expose private SortSpec [] sortKeys;
   @Expose private Schema keySchema = null;
   @Expose private Datum[] datum = null;
-  //TODO- @Expose private IndexType type;
   
   public IndexScanNode(ScanNode scanNode , 
       Schema keySchema , Datum[] datum, SortSpec[] sortKeys ) {
@@ -38,7 +37,6 @@ public class IndexScanNode extends ScanNode {
     setFromTable(scanNode.getFromTable());
     setInSchema(scanNode.getInSchema());
     setOutSchema(scanNode.getOutSchema());
-    setLocal(scanNode.isLocal());
     setTargets(scanNode.getTargets());
     setType(NodeType.BST_INDEX_SCAN);
     this.sortKeys = sortKeys;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
index 870d3bd..0882e56 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
@@ -21,6 +21,8 @@
  */
 package org.apache.tajo.engine.planner.logical;
 
+import org.apache.tajo.engine.planner.PlanString;
+
 public class IntersectNode extends BinaryNode {
 
   public IntersectNode() {
@@ -33,6 +35,15 @@ public class IntersectNode extends BinaryNode {
     setRightChild(inner);
   }
 
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Intersect");
+    planStr.appendTitle(" (L - " + ((TableSubQueryNode)getLeftChild()).getTableName());
+    planStr.appendTitle(", R - " + ((TableSubQueryNode)getRightChild()).getTableName());
+    planStr.appendTitle(")");
+    return planStr;
+  }
+
   public String toString() {
     return getLeftChild().toString() + "\n INTERSECT \n" + getRightChild().toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
index 6132c90..173ad55 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
@@ -24,6 +24,7 @@ package org.apache.tajo.engine.planner.logical;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.Target;
 
 public class JoinNode extends BinaryNode implements Projectable, Cloneable {
@@ -80,11 +81,21 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable {
   }
 
   @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Join (type : ")
+        .appendTitle(joinType +")");
+    if (hasJoinQual()) {
+      planStr.addExplan("Join Cond: " + joinQual.toString());
+    }
+    return planStr;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj instanceof JoinNode) {
       JoinNode other = (JoinNode) obj;
       return super.equals(other) && leftChild.equals(other.leftChild)
-          && inner.equals(other.inner);
+          && rightChild.equals(other.rightChild);
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
index d5bc926..697f1fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.planner.PlanString;
 
 public final class LimitNode extends UnaryNode implements Cloneable {
 	@Expose private long fetchFirstNum;
@@ -35,6 +36,11 @@ public final class LimitNode extends UnaryNode implements Cloneable {
   public long getFetchFirstNum() {
     return fetchFirstNum;
   }
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString("Limit");
+  }
   
   @Override 
   public boolean equals(Object obj) {
@@ -56,10 +62,10 @@ public final class LimitNode extends UnaryNode implements Cloneable {
   }
 
   public String toString() {
-    StringBuilder sb = new StringBuilder("LIMIT ").append(fetchFirstNum);
+    StringBuilder sb = new StringBuilder("Limit (").append(fetchFirstNum).append(")");
 
-    sb.append("\n\"out schema: ").append(getOutSchema())
-        .append("\n\"in schema: " + getInSchema());
+    sb.append("\n  \"out schema: ").append(getOutSchema())
+        .append("\n  \"in schema: " + getInSchema());
     sb.append("\n").append(getChild().toString());
 
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
index 6daaa50..7d7e86d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
@@ -22,9 +22,10 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TUtil;
 
 public abstract class LogicalNode implements Cloneable, GsonObject {
@@ -35,7 +36,6 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 	@Expose	private double cost = 0;
 	
 	public LogicalNode() {
-		
 	}
 
 	public LogicalNode(NodeType type) {
@@ -109,4 +109,6 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 
 	public abstract void preOrder(LogicalNodeVisitor visitor);
   public abstract void postOrder(LogicalNodeVisitor visitor);
+
+  public abstract PlanString getPlanString();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
index 67c08a6..e5373db 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.engine.planner.logical;
 
+import org.apache.tajo.engine.planner.PlanString;
+
 public class LogicalRootNode extends UnaryNode implements Cloneable {
   public LogicalRootNode() {
     super(NodeType.ROOT);
@@ -44,4 +46,9 @@ public class LogicalRootNode extends UnaryNode implements Cloneable {
   public Object clone() throws CloneNotSupportedException {
     return super.clone();
   }
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString("Root");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 35171ef..792fee8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -45,7 +45,9 @@ public enum NodeType {
   SELECTION(SelectionNode.class),
   STORE(StoreTableNode.class),
   SORT(SortNode.class),
-  UNION(UnionNode.class);
+  UNION(UnionNode.class),
+  TABLE_SUBQUERY(TableSubQueryNode.class);
+
 
   private final Class<? extends LogicalNode> baseClass;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
index 90891bc..935f188 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.Target;
 
 import java.util.Arrays;
@@ -108,4 +109,31 @@ public class ProjectionNode extends UnaryNode implements Projectable {
 	  
 	  return projNode;
 	}
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Projection: ");
+
+    if (distinct) {
+      planStr.appendTitle(" (distinct)");
+    }
+
+
+    StringBuilder sb = new StringBuilder("Targets: ");
+    for (int i = 0; i < targets.length; i++) {
+      sb.append(targets[i]);
+      if( i < targets.length - 1) {
+        sb.append(", ");
+      }
+    }
+    planStr.addExplan(sb.toString());
+    if (getOutSchema() != null) {
+      planStr.addExplan("out schema: " + getOutSchema().toString());
+    }
+    if (getInSchema() != null) {
+      planStr.addExplan("in  schema: " + getInSchema().toString());
+    }
+
+    return planStr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
index 1e566b0..a37ad5f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
@@ -27,9 +27,7 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalNodeVisitor;
+import org.apache.tajo.engine.planner.PlanString;
 
 import java.net.URI;
 import java.util.*;
@@ -74,6 +72,12 @@ public final class ReceiveNode extends LogicalNode implements Cloneable {
     return Collections.unmodifiableSet(fetchMap.entrySet());
   }
 
+
+  @Override
+  public PlanString getPlanString() {
+    return new PlanString("Receive ");
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ReceiveNode) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
new file mode 100644
index 0000000..583e472
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.catalog.Schema;
+
+public abstract class RelationNode extends LogicalNode {
+
+  public RelationNode(NodeType nodeType) {
+    super(nodeType);
+    assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
+  }
+
+  public abstract String getTableName();
+
+  public abstract String getCanonicalName();
+
+  public abstract Schema getTableSchema();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 584a697..6e528b0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -23,19 +23,17 @@ import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.FromTable;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
-public class ScanNode extends LogicalNode implements Projectable {
+public class ScanNode extends RelationNode implements Projectable {
 	@Expose private FromTable table;
 	@Expose private EvalNode qual;
 	@Expose private Target[] targets;
-	@Expose private boolean local;
-  @Expose private boolean broadcast;
 	
 	public ScanNode() {
-		super();
-		local = false;
+		super(NodeType.SCAN);
 	}
   
 	public ScanNode(FromTable table) {
@@ -43,10 +41,9 @@ public class ScanNode extends LogicalNode implements Projectable {
 		this.table = table;
 		this.setInSchema(table.getSchema());
 		this.setOutSchema(table.getSchema());
-		local = false;
 	}
 	
-	public String getTableId() {
+	public String getTableName() {
 	  return table.getTableName();
 	}
 	
@@ -70,14 +67,6 @@ public class ScanNode extends LogicalNode implements Projectable {
 	  return this.qual;
 	}
 	
-	public boolean isLocal() {
-	  return this.local;
-	}
-	
-	public void setLocal(boolean local) {
-	  this.local = local;
-	}
-	
 	public void setQual(EvalNode evalTree) {
 	  this.qual = evalTree;
 	}
@@ -96,14 +85,6 @@ public class ScanNode extends LogicalNode implements Projectable {
 	public Target [] getTargets() {
 	  return this.targets;
 	}
-
-  public boolean isBroadcast() {
-    return broadcast;
-  }
-
-  public void setBroadcast() {
-    broadcast = true;
-  }
 	
 	public FromTable getFromTable() {
 	  return this.table;
@@ -120,10 +101,6 @@ public class ScanNode extends LogicalNode implements Projectable {
 	  if (hasAlias()) {
 	    sb.append(",\"alias\": \"").append(table.getAlias());
 	  }
-
-    if (isBroadcast()) {
-      sb.append(",\"broadcast\": true\"");
-    }
 	  
 	  if (hasQual()) {
 	    sb.append(", \"qual\": \"").append(this.qual).append("\"");
@@ -196,4 +173,33 @@ public class ScanNode extends LogicalNode implements Projectable {
 	public void postOrder(LogicalNodeVisitor visitor) {        
     visitor.visit(this);
   }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Scan on ").appendTitle(getTableName());
+    if (table.hasAlias()) {
+      planStr.appendTitle(" as ").appendTitle(table.getAlias());
+    }
+
+    if (hasQual()) {
+      planStr.addExplan("filter: ").appendExplain(this.qual.toString());
+    }
+
+    if (hasTargets()) {
+      planStr.addExplan("target list: ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          planStr.appendExplain(", ");
+        }
+        planStr.appendExplain(target.toString());
+        first = false;
+      }
+    }
+
+    planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString());
+    planStr.addDetail("in schema: ").appendDetail(getInSchema().toString());
+
+    return planStr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index 2b2a0ac..2435453 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
 
 public class SelectionNode extends UnaryNode implements Cloneable {
 	@Expose private EvalNode qual;
@@ -40,17 +41,14 @@ public class SelectionNode extends UnaryNode implements Cloneable {
 	public void setQual(EvalNode qual) {
 		this.qual = qual;
 	}
-  
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("\"Selection\": {\"qual\": \"").append(qual.toString()).append("\",");
-    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
-    sb.append("\n  \"in schema\": ").append(getInSchema()).append("}");
-    
-    return sb.toString()+"\n"
-    + getChild().toString();
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Filter");
+    planStr.addExplan("Search Cond: " + getQual());
+    return planStr;
   }
-  
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof SelectionNode) {
@@ -70,4 +68,14 @@ public class SelectionNode extends UnaryNode implements Cloneable {
     
     return selNode;
   }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Selection\": {\"qual\": \"").append(qual.toString()).append("\",");
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
+    sb.append("\n  \"in schema\": ").append(getInSchema()).append("}");
+
+    return sb.toString()+"\n"
+        + getChild().toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
index 5456a77..96e6fc5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
@@ -28,8 +28,7 @@ import com.google.gson.Gson;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.UnaryNode;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 import java.net.URI;
@@ -108,6 +107,11 @@ public class SendNode extends UnaryNode {
   }
 
   @Override
+  public PlanString getPlanString() {
+    return null;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj instanceof SendNode) {
       SendNode other = (SendNode) obj;