You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:24 UTC

[30/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
new file mode 100644
index 0000000..0796cfd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.util.IndexUtil;
+
+import java.io.IOException;
+
+public class PhysicalPlannerImpl implements PhysicalPlanner {
+  private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
+  protected final TajoConf conf;
+  protected final StorageManager sm;
+
+  public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) {
+    this.conf = conf;
+    this.sm = sm;
+  }
+
+  public PhysicalExec createPlan(final TaskAttemptContext context,
+      final LogicalNode logicalPlan) throws InternalException {
+
+    PhysicalExec plan;
+
+    try {
+      plan = createPlanRecursive(context, logicalPlan);
+
+    } catch (IOException ioe) {
+      throw new InternalException(ioe);
+    }
+
+    return plan;
+  }
+
+  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+    PhysicalExec outer;
+    PhysicalExec inner;
+
+    switch (logicalNode.getType()) {
+
+      case ROOT:
+        LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
+        return createPlanRecursive(ctx, rootNode.getSubNode());
+
+      case EXPRS:
+        EvalExprNode evalExpr = (EvalExprNode) logicalNode;
+        return new EvalExprExec(ctx, evalExpr);
+
+      case STORE:
+        StoreTableNode storeNode = (StoreTableNode) logicalNode;
+        outer = createPlanRecursive(ctx, storeNode.getSubNode());
+        return createStorePlan(ctx, storeNode, outer);
+
+      case SELECTION:
+        SelectionNode selNode = (SelectionNode) logicalNode;
+        outer = createPlanRecursive(ctx, selNode.getSubNode());
+        return new SelectionExec(ctx, selNode, outer);
+
+      case PROJECTION:
+        ProjectionNode prjNode = (ProjectionNode) logicalNode;
+        outer = createPlanRecursive(ctx, prjNode.getSubNode());
+        return new ProjectionExec(ctx, prjNode, outer);
+
+      case SCAN:
+        outer = createScanPlan(ctx, (ScanNode) logicalNode);
+        return outer;
+
+      case GROUP_BY:
+        GroupbyNode grpNode = (GroupbyNode) logicalNode;
+        outer = createPlanRecursive(ctx, grpNode.getSubNode());
+        return createGroupByPlan(ctx, grpNode, outer);
+
+      case SORT:
+        SortNode sortNode = (SortNode) logicalNode;
+        outer = createPlanRecursive(ctx, sortNode.getSubNode());
+        return createSortPlan(ctx, sortNode, outer);
+
+      case JOIN:
+        JoinNode joinNode = (JoinNode) logicalNode;
+        outer = createPlanRecursive(ctx, joinNode.getOuterNode());
+        inner = createPlanRecursive(ctx, joinNode.getInnerNode());
+        return createJoinPlan(ctx, joinNode, outer, inner);
+
+      case UNION:
+        UnionNode unionNode = (UnionNode) logicalNode;
+        outer = createPlanRecursive(ctx, unionNode.getOuterNode());
+        inner = createPlanRecursive(ctx, unionNode.getInnerNode());
+        return new UnionExec(ctx, outer, inner);
+
+      case LIMIT:
+        LimitNode limitNode = (LimitNode) logicalNode;
+        outer = createPlanRecursive(ctx, limitNode.getSubNode());
+        return new LimitExec(ctx, limitNode.getInSchema(),
+            limitNode.getOutSchema(), outer, limitNode);
+
+      case CREATE_INDEX:
+        IndexWriteNode createIndexNode = (IndexWriteNode) logicalNode;
+        outer = createPlanRecursive(ctx, createIndexNode.getSubNode());
+        return createIndexWritePlan(sm, ctx, createIndexNode, outer);
+
+      case BST_INDEX_SCAN:
+        IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
+        outer = createIndexScanExec(ctx, indexScanNode);
+        return outer;
+
+      case RENAME:
+      case SET_UNION:
+      case SET_DIFF:
+      case SET_INTERSECT:
+      case INSERT_INTO:
+      case SHOW_TABLE:
+      case DESC_TABLE:
+      case SHOW_FUNCTION:
+      default:
+        return null;
+    }
+  }
+
+  private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) {
+    long size = 0;
+    for (String tableId : tableIds) {
+      Fragment[] fragments = ctx.getTables(tableId);
+      for (Fragment frag : fragments) {
+        size += frag.getLength();
+      }
+    }
+    return size;
+  }
+
+  public PhysicalExec createJoinPlan(TaskAttemptContext ctx, JoinNode joinNode,
+                                     PhysicalExec outer, PhysicalExec inner)
+      throws IOException {
+    switch (joinNode.getJoinType()) {
+      case CROSS_JOIN:
+        LOG.info("The planner chooses NLJoinExec");
+        return new NLJoinExec(ctx, joinNode, outer, inner);
+
+      case INNER:
+        String [] outerLineage = PlannerUtil.getLineage(joinNode.getOuterNode());
+        String [] innerLineage = PlannerUtil.getLineage(joinNode.getInnerNode());
+        long outerSize = estimateSizeRecursive(ctx, outerLineage);
+        long innerSize = estimateSizeRecursive(ctx, innerLineage);
+
+        final long threshold = 1048576 * 128; // 64MB
+
+        boolean hashJoin = false;
+        if (outerSize < threshold || innerSize < threshold) {
+          hashJoin = true;
+        }
+
+        if (hashJoin) {
+          PhysicalExec selectedOuter;
+          PhysicalExec selectedInner;
+
+          // HashJoinExec loads the inner relation to memory.
+          if (outerSize <= innerSize) {
+            selectedInner = outer;
+            selectedOuter = inner;
+          } else {
+            selectedInner = inner;
+            selectedOuter = outer;
+          }
+
+          LOG.info("The planner chooses HashJoinExec");
+          return new HashJoinExec(ctx, joinNode, selectedOuter, selectedInner);
+        }
+
+      default:
+        SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+            joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
+        ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
+            new SortNode(sortSpecs[0], outer.getSchema(), outer.getSchema()),
+            outer);
+        ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
+            new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
+            inner);
+
+        LOG.info("The planner chooses MergeJoinExec");
+        return new MergeJoinExec(ctx, joinNode, outerSort, innerSort,
+            sortSpecs[0], sortSpecs[1]);
+    }
+  }
+
+  public PhysicalExec createStorePlan(TaskAttemptContext ctx,
+                                      StoreTableNode plan, PhysicalExec subOp) throws IOException {
+    if (plan.hasPartitionKey()) {
+      switch (plan.getPartitionType()) {
+        case HASH:
+          return new PartitionedStoreExec(ctx, sm, plan, subOp);
+
+        case RANGE:
+          SortSpec [] sortSpecs = null;
+          if (subOp instanceof SortExec) {
+            sortSpecs = ((SortExec)subOp).getSortSpecs();
+          } else {
+            Column[] columns = plan.getPartitionKeys();
+            SortSpec specs[] = new SortSpec[columns.length];
+            for (int i = 0; i < columns.length; i++) {
+              specs[i] = new SortSpec(columns[i]);
+            }
+          }
+
+          return new IndexedStoreExec(ctx, sm, subOp,
+              plan.getInSchema(), plan.getInSchema(), sortSpecs);
+      }
+    }
+    if (plan instanceof StoreIndexNode) {
+      return new TunnelExec(ctx, plan.getOutSchema(), subOp);
+    }
+
+    return new StoreTableExec(ctx, sm, plan, subOp);
+  }
+
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode)
+      throws IOException {
+    Preconditions.checkNotNull(ctx.getTable(scanNode.getTableId()),
+        "Error: There is no table matched to %s", scanNode.getTableId());
+
+    Fragment[] fragments = ctx.getTables(scanNode.getTableId());
+    return new SeqScanExec(ctx, sm, scanNode, fragments);
+  }
+
+  public PhysicalExec createGroupByPlan(TaskAttemptContext ctx,
+                                        GroupbyNode groupbyNode, PhysicalExec subOp) throws IOException {
+    Column[] grpColumns = groupbyNode.getGroupingColumns();
+    if (grpColumns.length == 0) {
+      LOG.info("The planner chooses HashAggregationExec");
+      return new HashAggregateExec(ctx, groupbyNode, subOp);
+    } else {
+      String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getSubNode());
+      long estimatedSize = estimateSizeRecursive(ctx, outerLineage);
+      final long threshold = 1048576 * 256;
+
+      // if the relation size is less than the reshold,
+      // the hash aggregation will be used.
+      if (estimatedSize <= threshold) {
+        LOG.info("The planner chooses HashAggregationExec");
+        return new HashAggregateExec(ctx, groupbyNode, subOp);
+      } else {
+        SortSpec[] specs = new SortSpec[grpColumns.length];
+        for (int i = 0; i < grpColumns.length; i++) {
+          specs[i] = new SortSpec(grpColumns[i], true, false);
+        }
+        SortNode sortNode = new SortNode(specs);
+        sortNode.setInSchema(subOp.getSchema());
+        sortNode.setOutSchema(subOp.getSchema());
+        // SortExec sortExec = new SortExec(sortNode, child);
+        ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode,
+            subOp);
+        LOG.info("The planner chooses SortAggregationExec");
+        return new SortAggregateExec(ctx, groupbyNode, sortExec);
+      }
+    }
+  }
+
+  public PhysicalExec createSortPlan(TaskAttemptContext ctx, SortNode sortNode,
+                                     PhysicalExec subOp) throws IOException {
+    return new ExternalSortExec(ctx, sm, sortNode, subOp);
+  }
+
+  public PhysicalExec createIndexWritePlan(StorageManager sm,
+                                           TaskAttemptContext ctx, IndexWriteNode indexWriteNode, PhysicalExec subOp)
+      throws IOException {
+
+    return new IndexWriteExec(ctx, sm, indexWriteNode, ctx.getTable(indexWriteNode
+        .getTableName()), subOp);
+  }
+
+  public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
+                                          IndexScanNode annotation)
+      throws IOException {
+    //TODO-general Type Index
+    Preconditions.checkNotNull(ctx.getTable(annotation.getTableId()),
+        "Error: There is no table matched to %s", annotation.getTableId());
+
+    Fragment[] fragments = ctx.getTables(annotation.getTableId());
+
+    String indexName = IndexUtil.getIndexNameOfFrag(fragments[0],
+        annotation.getSortKeys());
+    Path indexPath = new Path(sm.getTablePath(annotation.getTableId()), "index");
+
+    TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
+        annotation.getSortKeys());
+    return new BSTIndexScanExec(ctx, sm, annotation, fragments[0], new Path(
+        indexPath, indexName), annotation.getKeySchema(), comp,
+        annotation.getDatum());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
new file mode 100644
index 0000000..50b539a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class PlannerUtil {
+  private static final Log LOG = LogFactory.getLog(PlannerUtil.class);
+  
+  public static String [] getLineage(LogicalNode node) {
+    LogicalNode [] scans =  PlannerUtil.findAllNodes(node, ExprType.SCAN);
+    String [] tableNames = new String[scans.length];
+    ScanNode scan;
+    for (int i = 0; i < scans.length; i++) {
+      scan = (ScanNode) scans[i];
+      /*if (scan.hasAlias()) {
+        tableNames[i] = scan.getAlias();
+      } else {*/
+        tableNames[i] = scan.getTableId();
+      //}
+    }
+    return tableNames;
+  }
+  
+  public static LogicalNode insertNode(LogicalNode parent, LogicalNode newNode) {
+    Preconditions.checkArgument(parent instanceof UnaryNode);
+    Preconditions.checkArgument(newNode instanceof UnaryNode);
+    
+    UnaryNode p = (UnaryNode) parent;
+    LogicalNode c = p.getSubNode();
+    UnaryNode m = (UnaryNode) newNode;
+    m.setInSchema(c.getOutSchema());
+    m.setOutSchema(c.getOutSchema());
+    m.setSubNode(c);
+    p.setSubNode(m);
+    
+    return p;
+  }
+  
+  /**
+   * Delete the child of a given parent operator.
+   * 
+   * @param parent Must be a unary logical operator.
+   * @return input parent node
+   */
+  public static LogicalNode deleteNode(LogicalNode parent) {
+    if (parent instanceof UnaryNode) {
+      UnaryNode unary = (UnaryNode) parent;
+      if (unary.getSubNode() instanceof UnaryNode) {
+        UnaryNode child = (UnaryNode) unary.getSubNode();
+        LogicalNode grandChild = child.getSubNode();
+        unary.setSubNode(grandChild);
+      } else {
+        throw new InvalidQueryException("Unexpected logical plan: " + parent);
+      }
+    } else {
+      throw new InvalidQueryException("Unexpected logical plan: " + parent);
+    }    
+    return parent;
+  }
+  
+  public static void replaceNode(LogicalNode plan, LogicalNode newNode, ExprType type) {
+    LogicalNode parent = findTopParentNode(plan, type);
+    Preconditions.checkArgument(parent instanceof UnaryNode);
+    Preconditions.checkArgument(!(newNode instanceof BinaryNode));
+    UnaryNode parentNode = (UnaryNode) parent;
+    LogicalNode child = parentNode.getSubNode();
+    if (child instanceof UnaryNode) {
+      ((UnaryNode) newNode).setSubNode(((UnaryNode)child).getSubNode());
+    }
+    parentNode.setSubNode(newNode);
+  }
+  
+  public static LogicalNode insertOuterNode(LogicalNode parent, LogicalNode outer) {
+    Preconditions.checkArgument(parent instanceof BinaryNode);
+    Preconditions.checkArgument(outer instanceof UnaryNode);
+    
+    BinaryNode p = (BinaryNode) parent;
+    LogicalNode c = p.getOuterNode();
+    UnaryNode m = (UnaryNode) outer;
+    m.setInSchema(c.getOutSchema());
+    m.setOutSchema(c.getOutSchema());
+    m.setSubNode(c);
+    p.setOuter(m);
+    return p;
+  }
+  
+  public static LogicalNode insertInnerNode(LogicalNode parent, LogicalNode inner) {
+    Preconditions.checkArgument(parent instanceof BinaryNode);
+    Preconditions.checkArgument(inner instanceof UnaryNode);
+    
+    BinaryNode p = (BinaryNode) parent;
+    LogicalNode c = p.getInnerNode();
+    UnaryNode m = (UnaryNode) inner;
+    m.setInSchema(c.getOutSchema());
+    m.setOutSchema(c.getOutSchema());
+    m.setSubNode(c);
+    p.setInner(m);
+    return p;
+  }
+  
+  public static LogicalNode insertNode(LogicalNode parent, 
+      LogicalNode left, LogicalNode right) {
+    Preconditions.checkArgument(parent instanceof BinaryNode);
+    Preconditions.checkArgument(left instanceof UnaryNode);
+    Preconditions.checkArgument(right instanceof UnaryNode);
+    
+    BinaryNode p = (BinaryNode)parent;
+    LogicalNode lc = p.getOuterNode();
+    LogicalNode rc = p.getInnerNode();
+    UnaryNode lm = (UnaryNode)left;
+    UnaryNode rm = (UnaryNode)right;
+    lm.setInSchema(lc.getOutSchema());
+    lm.setOutSchema(lc.getOutSchema());
+    lm.setSubNode(lc);
+    rm.setInSchema(rc.getOutSchema());
+    rm.setOutSchema(rc.getOutSchema());
+    rm.setSubNode(rc);
+    p.setOuter(lm);
+    p.setInner(rm);
+    return p;
+  }
+  
+  public static LogicalNode transformGroupbyTo2P(GroupbyNode gp) {
+    Preconditions.checkNotNull(gp);
+        
+    try {
+      // cloning groupby node
+      GroupbyNode child = (GroupbyNode) gp.clone();
+
+      List<QueryBlock.Target> newChildTargets = Lists.newArrayList();
+      QueryBlock.Target[] secondTargets = gp.getTargets();
+      QueryBlock.Target[] firstTargets = child.getTargets();
+
+      QueryBlock.Target second;
+      QueryBlock.Target first;
+      int firstTargetId = 0;
+      for (int i = 0; i < firstTargets.length; i++) {
+        second = secondTargets[i];
+        first = firstTargets[i];
+
+        List<AggFuncCallEval> secondFuncs = EvalTreeUtil
+            .findDistinctAggFunction(second.getEvalTree());
+        List<AggFuncCallEval> firstFuncs = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+
+        if (firstFuncs.size() == 0) {
+          newChildTargets.add(first);
+          firstTargetId++;
+        } else {
+          for (AggFuncCallEval func : firstFuncs) {
+            func.setFirstPhase();
+            QueryBlock.Target
+                newTarget = new QueryBlock.Target(func, firstTargetId++);
+            newTarget.setAlias("column_"+ firstTargetId);
+
+
+            AggFuncCallEval secondFunc = null;
+            for (AggFuncCallEval sf : secondFuncs) {
+              if (func.equals(sf)) {
+                secondFunc = sf;
+                break;
+              }
+            }
+            if (func.getValueType().length > 1) { // hack for partial result
+              secondFunc.setArgs(new EvalNode[] {new FieldEval(
+                  new Column("column_"+firstTargetId, Type.ARRAY))});
+            } else {
+              secondFunc.setArgs(new EvalNode [] {new FieldEval(
+                  new Column("column_"+firstTargetId, newTarget.getEvalTree().getValueType()[0]))});
+            }
+            newChildTargets.add(newTarget);
+          }
+        }
+      }
+
+      QueryBlock.Target[] targetArray = newChildTargets.toArray(new QueryBlock.Target[newChildTargets.size()]);
+      child.setTargetList(targetArray);
+      child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
+      // set the groupby chaining
+      gp.setSubNode(child);
+      gp.setInSchema(child.getOutSchema());
+    } catch (CloneNotSupportedException e) {
+      LOG.error(e);
+    }
+    
+    return gp;
+  }
+  
+  public static LogicalNode transformSortTo2P(SortNode sort) {
+    Preconditions.checkNotNull(sort);
+    
+    try {
+      SortNode child = (SortNode) sort.clone();
+      sort.setSubNode(child);
+      sort.setInSchema(child.getOutSchema());
+      sort.setOutSchema(child.getOutSchema());
+    } catch (CloneNotSupportedException e) {
+      LOG.error(e);
+    }
+    return sort;
+  }
+  
+  public static LogicalNode transformGroupbyTo2PWithStore(GroupbyNode gb, 
+      String tableId) {
+    GroupbyNode groupby = (GroupbyNode) transformGroupbyTo2P(gb);
+    return insertStore(groupby, tableId);
+  }
+  
+  public static LogicalNode transformSortTo2PWithStore(SortNode sort, 
+      String tableId) {
+    SortNode sort2p = (SortNode) transformSortTo2P(sort);
+    return insertStore(sort2p, tableId);
+  }
+  
+  private static LogicalNode insertStore(LogicalNode parent, 
+      String tableId) {
+    StoreTableNode store = new StoreTableNode(tableId);
+    store.setLocal(true);
+    insertNode(parent, store);
+    
+    return parent;
+  }
+  
+  /**
+   * Find the top logical node matched to type from the given node
+   * 
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static LogicalNode findTopNode(LogicalNode node, ExprType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+    
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.postOrder(finder);
+    
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return finder.getFoundNodes().get(0);
+  }
+
+  /**
+   * Find the all logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static LogicalNode [] findAllNodes(LogicalNode node, ExprType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.postOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return new LogicalNode[] {};
+    }
+    List<LogicalNode> founds = finder.getFoundNodes();
+    return founds.toArray(new LogicalNode[founds.size()]);
+  }
+  
+  /**
+   * Find a parent node of a given-typed operator.
+   * 
+   * @param node start node
+   * @param type to find
+   * @return the parent node of a found logical node
+   */
+  public static LogicalNode findTopParentNode(LogicalNode node, ExprType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+    
+    ParentNodeFinder finder = new ParentNodeFinder(type);
+    node.postOrder(finder);
+    
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return finder.getFoundNodes().get(0);
+  }
+  
+  private static class LogicalNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private final ExprType [] tofind;
+    private boolean topmost = false;
+    private boolean finished = false;
+
+    public LogicalNodeFinder(ExprType...type) {
+      this.tofind = type;
+    }
+
+    public LogicalNodeFinder(ExprType [] type, boolean topmost) {
+      this(type);
+      this.topmost = topmost;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (!finished) {
+        for (ExprType type : tofind) {
+          if (node.getType() == type) {
+            list.add(node);
+          }
+          if (topmost && list.size() > 0) {
+            finished = true;
+          }
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+  }
+  
+  private static class ParentNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private ExprType tofind;
+
+    public ParentNodeFinder(ExprType type) {
+      this.tofind = type;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        if (unary.getSubNode().getType() == tofind) {
+          list.add(node);
+        }
+      } else if (node instanceof BinaryNode){
+        BinaryNode bin = (BinaryNode) node;
+        if (bin.getOuterNode().getType() == tofind ||
+            bin.getInnerNode().getType() == tofind) {
+          list.add(node);
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+  }
+  
+  public static Set<Column> collectColumnRefs(LogicalNode node) {
+    ColumnRefCollector collector = new ColumnRefCollector();
+    node.postOrder(collector);
+    return collector.getColumns();
+  }
+  
+  private static class ColumnRefCollector implements LogicalNodeVisitor {
+    private Set<Column> collected = Sets.newHashSet();
+    
+    public Set<Column> getColumns() {
+      return this.collected;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      Set<Column> temp;
+      switch (node.getType()) {
+      case PROJECTION:
+        ProjectionNode projNode = (ProjectionNode) node;
+
+        for (QueryBlock.Target t : projNode.getTargets()) {
+          temp = EvalTreeUtil.findDistinctRefColumns(t.getEvalTree());
+          if (!temp.isEmpty()) {
+            collected.addAll(temp);
+          }
+        }
+
+        break;
+
+      case SELECTION:
+        SelectionNode selNode = (SelectionNode) node;
+        temp = EvalTreeUtil.findDistinctRefColumns(selNode.getQual());
+        if (!temp.isEmpty()) {
+          collected.addAll(temp);
+        }
+
+        break;
+        
+      case GROUP_BY:
+        GroupbyNode groupByNode = (GroupbyNode)node;
+        collected.addAll(Lists.newArrayList(groupByNode.getGroupingColumns()));
+        for (QueryBlock.Target t : groupByNode.getTargets()) {
+          temp = EvalTreeUtil.findDistinctRefColumns(t.getEvalTree());
+          if (!temp.isEmpty()) {
+            collected.addAll(temp);
+          }
+        }
+        if(groupByNode.hasHavingCondition()) {
+          temp = EvalTreeUtil.findDistinctRefColumns(groupByNode.
+              getHavingCondition());
+          if (!temp.isEmpty()) {
+            collected.addAll(temp);
+          }
+        }
+        
+        break;
+        
+      case SORT:
+        SortNode sortNode = (SortNode) node;
+        for (SortSpec key : sortNode.getSortKeys()) {
+          collected.add(key.getSortKey());
+        }
+        
+        break;
+        
+      case JOIN:
+        JoinNode joinNode = (JoinNode) node;
+        if (joinNode.hasJoinQual()) {
+          temp = EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual());
+          if (!temp.isEmpty()) {
+            collected.addAll(temp);
+          }
+        }
+        
+        break;
+        
+      case SCAN:
+        ScanNode scanNode = (ScanNode) node;
+        if (scanNode.hasQual()) {
+          temp = EvalTreeUtil.findDistinctRefColumns(scanNode.getQual());
+          if (!temp.isEmpty()) {
+            collected.addAll(temp);
+          }
+        }
+
+        break;
+        
+      default:
+      }
+    }
+  }
+
+  public static QueryBlock.Target[] schemaToTargets(Schema schema) {
+    QueryBlock.Target[] targets = new QueryBlock.Target[schema.getColumnNum()];
+
+    FieldEval eval;
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets[i] = new QueryBlock.Target(eval, i);
+    }
+    return targets;
+  }
+
+  public static SortSpec[] schemaToSortSpecs(Schema schema) {
+    SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      specs[i] = new SortSpec(schema.getColumn(i), true, false);
+    }
+
+    return specs;
+  }
+
+  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+    Schema schema = new Schema();
+    for (SortSpec spec : sortSpecs) {
+      schema.addColumn(spec.getSortKey());
+    }
+
+    return schema;
+  }
+
+  /**
+   * is it join qual or not?
+   * TODO - this method does not support the self join (NTA-740)
+   * @param qual
+   * @return true if two operands refers to columns and the operator is comparison,
+   */
+  public static boolean isJoinQual(EvalNode qual) {
+    if (EvalTreeUtil.isComparisonOperator(qual)) {
+      List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
+      List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
+
+      if (left.size() == 1 && right.size() == 1 &&
+          !left.get(0).getTableName().equals(right.get(0).getTableName()))
+        return true;
+    }
+
+    return false;
+  }
+
+  public static SortSpec[][] getSortKeysFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+    List<Column []> joinKeyPairs = getJoinKeyPairs(joinQual, outer, inner);
+    SortSpec[] outerSortSpec = new SortSpec[joinKeyPairs.size()];
+    SortSpec[] innerSortSpec = new SortSpec[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      outerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[0]);
+      innerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[1]);
+    }
+
+    return new SortSpec[][] {outerSortSpec, innerSortSpec};
+  }
+
+  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+    SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, outer, inner);
+    TupleComparator [] comparators = new TupleComparator[2];
+    comparators[0] = new TupleComparator(outer, sortSpecs[0]);
+    comparators[1] = new TupleComparator(inner, sortSpecs[1]);
+    return comparators;
+  }
+
+  public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema outer, Schema inner) {
+    JoinKeyPairFinder finder = new JoinKeyPairFinder(outer, inner);
+    joinQual.preOrder(finder);
+    return finder.getPairs();
+  }
+
+  public static class JoinKeyPairFinder implements EvalNodeVisitor {
+    private final List<Column []> pairs = Lists.newArrayList();
+    private Schema [] schemas = new Schema[2];
+
+    public JoinKeyPairFinder(Schema outer, Schema inner) {
+      schemas[0] = outer;
+      schemas[1] = inner;
+    }
+
+    @Override
+    public void visit(EvalNode node) {
+      if (EvalTreeUtil.isJoinQual(node)) {
+        Column [] pair = new Column[2];
+
+        for (int i = 0; i <= 1; i++) { // access left, right sub expression
+          Column column = EvalTreeUtil.findAllColumnRefs(node.getExpr(i)).get(0);
+          for (int j = 0; j < schemas.length; j++) {
+          // check whether the column is for either outer or inner
+          // 0 is outer, and 1 is inner
+            if (schemas[j].contains(column.getQualifiedName())) {
+              pair[j] = column;
+            }
+          }
+        }
+
+        if (pair[0] == null || pair[1] == null) {
+          throw new IllegalStateException("Wrong join key: " + node);
+        }
+        pairs.add(pair);
+      }
+    }
+
+    public List<Column []> getPairs() {
+      return this.pairs;
+    }
+  }
+
+  public static Schema targetToSchema(QueryBlock.Target[] targets) {
+    Schema schema = new Schema();
+    for(QueryBlock.Target t : targets) {
+      DataType type;
+      if (t.getEvalTree().getValueType().length > 1) {
+        type = CatalogUtil.newDataTypeWithoutLen(Type.ARRAY);
+      } else {
+        type = t.getEvalTree().getValueType()[0];
+      }
+      String name;
+      if (t.hasAlias()) {
+        name = t.getAlias();
+      } else {
+        name = t.getEvalTree().getName();
+      }
+      schema.addColumn(name, type);
+    }
+
+    return schema;
+  }
+
+  public static EvalNode [] columnsToEvals(Column [] columns) {
+    EvalNode [] exprs = new EvalNode[columns.length];
+    for (int i = 0; i < columns.length; i++) {
+      exprs[i] = new FieldEval(columns[i]);
+    }
+    return exprs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
new file mode 100644
index 0000000..595f41a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContext.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.tajo.engine.parser.ParseTree;
+
+public interface PlanningContext {
+  String getRawQuery();
+
+  Tree getAST();
+
+  ParseTree getParseTree();
+
+  //TableMap getTableMap();
+
+  String getGeneratedColumnName();
+
+  String getGeneratedColumnName(String prefix);
+
+  String getExplicitOutputTable();
+
+  boolean hasExplicitOutputTable();
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
new file mode 100644
index 0000000..34ea73a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlanningContextImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.tajo.engine.parser.ParseTree;
+
+public class PlanningContextImpl implements PlanningContext  {
+  private String rawQuery;
+  // private TableMap tableMap = new TableMap();
+  private ParseTree parseTree;
+  private Tree ast;
+  private String outputTableName;
+
+  private static final String DEFAULT_TABLE_GEN_PREFIX = "table";
+  private static final String DEFAULT_COLUMN_GEN_PREFIX = "column";
+  private static final String SEPARATOR = "_";
+
+  private int generatedTableId = 1;
+  private int generatedColumnId = 1;
+
+
+  public PlanningContextImpl(String rawQuery) {
+    this.rawQuery = rawQuery;
+  }
+
+  @Override
+  public String getRawQuery() {
+    return rawQuery;
+  }
+
+  @Override
+  public Tree getAST() {
+    return ast;
+  }
+
+  public void setAST(Tree ast) {
+    this.ast = ast;
+  }
+
+  @Override
+  public ParseTree getParseTree() {
+    return parseTree;
+  }
+
+//  @Override
+//  public TableMap getTableMap() {
+//    return tableMap;
+//  }
+
+  @Override
+  public String getGeneratedColumnName() {
+    return DEFAULT_COLUMN_GEN_PREFIX + SEPARATOR + (generatedColumnId++);
+  }
+
+  @Override
+  public synchronized String getGeneratedColumnName(String prefix) {
+    return prefix + SEPARATOR + (generatedColumnId++);
+  }
+
+  @Override
+  public String getExplicitOutputTable() {
+    return outputTableName;
+  }
+
+  @Override
+  public boolean hasExplicitOutputTable() {
+    return outputTableName != null;
+  }
+
+  public void setOutputTableName(String outputTableName) {
+    this.outputTableName = outputTableName;
+  }
+
+  public void setParseTree(ParseTree parseTree) {
+    this.parseTree = parseTree;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
new file mode 100644
index 0000000..da1b43c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.parser.QueryBlock.Target;
+import org.apache.tajo.storage.Tuple;
+
+public class Projector {
+  private final Schema inSchema;
+  private final Schema outSchema;
+
+  // for projection
+  private final int targetNum;
+  private final int [] inMap;
+  private final int [] outMap;
+  private int [] evalOutMap; // target list?
+  private EvalNode[] evals;
+  private Tuple prevTuple;
+
+  public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+    this.inSchema = inSchema;
+    this.outSchema = outSchema;
+
+    this.targetNum = targets != null ? targets.length : 0;
+
+    inMap = new int[outSchema.getColumnNum() - targetNum];
+    outMap = new int[outSchema.getColumnNum() - targetNum];
+    int mapId = 0;
+    Column col;
+
+    if (targetNum > 0) {
+      evalOutMap = new int[targetNum];
+      evals = new EvalNode[targetNum];
+      for (int i = 0; i < targetNum; i++) {
+        // TODO - is it always  correct?
+        if (targets[i].hasAlias()) {
+          evalOutMap[i] = outSchema.getColumnId(targets[i].getAlias());
+        } else {
+          evalOutMap[i] = outSchema.getColumnId(targets[i].getEvalTree().getName());
+        }
+        evals[i] = targets[i].getEvalTree();
+      }
+
+      outer:
+      for (int targetId = 0; targetId < outSchema.getColumnNum(); targetId ++) {
+        for (int j = 0; j < evalOutMap.length; j++) {
+          if (evalOutMap[j] == targetId)
+            continue outer;
+        }
+
+        col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+        outMap[mapId] = targetId;
+        inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
+        mapId++;
+      }
+    } else {
+      for (int targetId = 0; targetId < outSchema.getColumnNum(); targetId ++) {
+        col = inSchema.getColumn(outSchema.getColumn(targetId).getQualifiedName());
+        outMap[mapId] = targetId;
+        inMap[mapId] = inSchema.getColumnId(col.getQualifiedName());
+        mapId++;
+      }
+    }
+  }
+
+  public void eval(EvalContext[] evalContexts, Tuple in) {
+    this.prevTuple = in;
+    if (targetNum > 0) {
+      for (int i = 0; i < evals.length; i++) {
+        evals[i].eval(evalContexts[i], inSchema, in);
+      }
+    }
+  }
+
+  public void terminate(EvalContext [] evalContexts, Tuple out) {
+    for (int i = 0; i < inMap.length; i++) {
+      out.put(outMap[i], prevTuple.get(inMap[i]));
+    }
+    if (targetNum > 0) {
+      for (int i = 0; i < evals.length; i++) {
+        out.put(evalOutMap[i], evals[i].terminate(evalContexts[i]));
+      }
+    }
+  }
+
+  public EvalContext [] renew() {
+    EvalContext [] evalContexts = new EvalContext[targetNum];
+    for (int i = 0; i < targetNum; i++) {
+      evalContexts[i] = evals[i].newContext();
+    }
+
+    return evalContexts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
new file mode 100644
index 0000000..c18d403
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangeOverflowException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+public class RangeOverflowException extends RuntimeException {
+  public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) {
+    super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
new file mode 100644
index 0000000..e25903f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+
+import java.math.BigDecimal;
+
+public abstract class RangePartitionAlgorithm {
+  protected Schema schema;
+  protected TupleRange range;
+  protected final BigDecimal totalCard;
+  /** true if the end of the range is inclusive. Otherwise, it should be false. */
+  protected final boolean inclusive;
+
+  /**
+   *
+   * @param schema the schema of the range tuples
+   * @param range range to be partition
+   * @param inclusive true if the end of the range is inclusive. Otherwise, false.
+   */
+  public RangePartitionAlgorithm(Schema schema, TupleRange range, boolean inclusive) {
+    this.schema = schema;
+    this.range = range;
+    this.inclusive = inclusive;
+    this.totalCard = computeCardinalityForAllColumns(schema, range, inclusive);
+  }
+
+  /**
+   * It computes the value cardinality of a tuple range.
+   *
+   * @param dataType
+   * @param start
+   * @param end
+   * @return
+   */
+  public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
+                                              boolean inclusive) {
+    BigDecimal columnCard;
+
+    switch (dataType.getType()) {
+      case CHAR:
+        columnCard = new BigDecimal(end.asChar() - start.asChar());
+        break;
+      case BIT:
+        columnCard = new BigDecimal(end.asByte() - start.asByte());
+        break;
+      case INT2:
+        columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+        break;
+      case INT4:
+        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        break;
+      case INT8:
+        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        break;
+      case FLOAT4:
+        columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+        break;
+      case FLOAT8:
+        columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+        break;
+      case TEXT:
+        columnCard = new BigDecimal(end.asChars().charAt(0) - start.asChars().charAt(0));
+        break;
+      default:
+        throw new UnsupportedOperationException(dataType + " is not supported yet");
+    }
+
+    return inclusive ? columnCard.add(new BigDecimal(1)) : columnCard;
+  }
+
+  /**
+   * It computes the value cardinality of a tuple range.
+   * @return
+   */
+  public static BigDecimal computeCardinalityForAllColumns(Schema schema, TupleRange range, boolean inclusive) {
+    Tuple start = range.getStart();
+    Tuple end = range.getEnd();
+    Column col;
+
+    BigDecimal cardinality = new BigDecimal(1);
+    BigDecimal columnCard;
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      col = schema.getColumn(i);
+      columnCard = computeCardinality(col.getDataType(), start.get(i), end.get(i), inclusive);
+
+      if (new BigDecimal(0).compareTo(columnCard) < 0) {
+        cardinality = cardinality.multiply(columnCard);
+      }
+    }
+
+    return cardinality;
+  }
+
+  public BigDecimal getTotalCardinality() {
+    return totalCard;
+  }
+
+  /**
+   *
+   * @param partNum the number of desired partitions, but it may return the less partitions.
+   * @return the end of intermediate ranges are exclusive, and the end of final range is inclusive.
+   */
+  public abstract TupleRange[] partition(int partNum);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
new file mode 100644
index 0000000..b52be45
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.VTuple;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+
+public class UniformRangePartition extends RangePartitionAlgorithm {
+  private int variableId;
+  private BigDecimal[] cardForEachDigit;
+  private BigDecimal[] colCards;
+
+  /**
+   *
+   * @param schema
+   * @param range
+   * @param inclusive true if the end of the range is inclusive
+   */
+  public UniformRangePartition(Schema schema, TupleRange range, boolean inclusive) {
+    super(schema, range, inclusive);
+    colCards = new BigDecimal[schema.getColumnNum()];
+    for (int i = 0; i < schema.getColumnNum(); i++) {
+      colCards[i] = computeCardinality(schema.getColumn(i).getDataType(), range.getStart().get(i),
+          range.getEnd().get(i), inclusive);
+    }
+
+    cardForEachDigit = new BigDecimal[colCards.length];
+    for (int i = 0; i < colCards.length ; i++) {
+      if (i == 0) {
+        cardForEachDigit[i] = colCards[i];
+      } else {
+        cardForEachDigit[i] = cardForEachDigit[i - 1].multiply(colCards[i]);
+      }
+    }
+  }
+
+  public UniformRangePartition(Schema schema, TupleRange range) {
+    this(schema, range, true);
+  }
+
+  @Override
+  public TupleRange[] partition(int partNum) {
+    Preconditions.checkArgument(partNum > 0,
+        "The number of partitions must be positive, but the given number: "
+        + partNum);
+    Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
+        "the number of partition cannot exceed total cardinality (" + totalCard + ")");
+
+    int varId;
+    for (varId = 0; varId < cardForEachDigit.length; varId++) {
+      if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0)
+        break;
+    }
+    this.variableId = varId;
+
+    BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1];
+    for (int i = variableId; i >= 0; i--) {
+      if (i == variableId) {
+        reverseCardsForDigit[i] = colCards[i];
+      } else {
+        reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+      }
+    }
+
+    List<TupleRange> ranges = Lists.newArrayList();
+    BigDecimal term = reverseCardsForDigit[0].divide(
+        new BigDecimal(partNum), RoundingMode.CEILING);
+    BigDecimal reminder = reverseCardsForDigit[0];
+    Tuple last = range.getStart();
+    while(reminder.compareTo(new BigDecimal(0)) > 0) {
+      if (reminder.compareTo(term) <= 0) { // final one is inclusive
+        ranges.add(new TupleRange(schema, last, range.getEnd()));
+      } else {
+        Tuple next = increment(last, term.longValue(), variableId);
+        ranges.add(new TupleRange(schema, last, next));
+      }
+      last = ranges.get(ranges.size() - 1).getEnd();
+      reminder = reminder.subtract(term);
+    }
+
+    return ranges.toArray(new TupleRange[ranges.size()]);
+  }
+
+  public boolean isOverflow(int colId, Datum last, BigDecimal inc) {
+    Column column = schema.getColumn(colId);
+    BigDecimal candidate;
+    boolean overflow = false;
+    switch (column.getDataType().getType()) {
+      case BIT: {
+        candidate = inc.add(new BigDecimal(last.asByte()));
+        return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+      }
+      case CHAR: {
+        candidate = inc.add(new BigDecimal((int)last.asChar()));
+        return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+      }
+      case INT2: {
+        candidate = inc.add(new BigDecimal(last.asInt2()));
+        return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+      }
+      case INT4: {
+        candidate = inc.add(new BigDecimal(last.asInt4()));
+        return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+      }
+      case INT8: {
+        candidate = inc.add(new BigDecimal(last.asInt8()));
+        return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+      }
+      case FLOAT4: {
+        candidate = inc.add(new BigDecimal(last.asFloat4()));
+        return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+      }
+      case FLOAT8: {
+        candidate = inc.add(new BigDecimal(last.asFloat8()));
+        return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+      }
+      case TEXT: {
+        candidate = inc.add(new BigDecimal((int)(last.asChars().charAt(0))));
+        return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+      }
+    }
+    return overflow;
+  }
+
+  public long incrementAndGetReminder(int colId, Datum last, long inc) {
+    Column column = schema.getColumn(colId);
+    long reminder = 0;
+    switch (column.getDataType().getType()) {
+      case BIT: {
+        long candidate = last.asByte() + inc;
+        byte end = range.getEnd().get(colId).asByte();
+        reminder = candidate - end;
+        break;
+      }
+      case CHAR: {
+        long candidate = last.asChar() + inc;
+        char end = range.getEnd().get(colId).asChar();
+        reminder = candidate - end;
+        break;
+      }
+      case INT4: {
+        int candidate = (int) (last.asInt4() + inc);
+        int end = range.getEnd().get(colId).asInt4();
+        reminder = candidate - end;
+        break;
+      }
+      case INT8: {
+        long candidate = last.asInt8() + inc;
+        long end = range.getEnd().get(colId).asInt8();
+        reminder = candidate - end;
+        break;
+      }
+      case FLOAT4: {
+        float candidate = last.asFloat4() + inc;
+        float end = range.getEnd().get(colId).asFloat4();
+        reminder = (long) (candidate - end);
+        break;
+      }
+      case FLOAT8: {
+        double candidate = last.asFloat8() + inc;
+        double end = range.getEnd().get(colId).asFloat8();
+        reminder = (long) Math.ceil(candidate - end);
+        break;
+      }
+      case TEXT: {
+        char candidate = ((char)(last.asChars().charAt(0) + inc));
+        char end = range.getEnd().get(colId).asChars().charAt(0);
+        reminder = (char) (candidate - end);
+        break;
+      }
+    }
+
+    // including zero
+    return reminder - 1;
+  }
+
+  /**
+   *
+   * @param last
+   * @param inc
+   * @return
+   */
+  public Tuple increment(final Tuple last, final long inc, final int baseDigit) {
+    BigDecimal [] incs = new BigDecimal[last.size()];
+    boolean [] overflowFlag = new boolean[last.size()];
+    BigDecimal [] result;
+    BigDecimal value = new BigDecimal(inc);
+
+    BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1];
+    for (int i = baseDigit; i >= 0; i--) {
+      if (i == baseDigit) {
+        reverseCardsForDigit[i] = colCards[i];
+      } else {
+        reverseCardsForDigit[i] = reverseCardsForDigit[i+1].multiply(colCards[i]);
+      }
+    }
+
+    for (int i = 0; i < baseDigit; i++) {
+      result = value.divideAndRemainder(reverseCardsForDigit[i + 1]);
+      incs[i] = result[0];
+      value = result[1];
+    }
+    int finalId = baseDigit;
+    incs[finalId] = value;
+    for (int i = finalId; i >= 0; i--) {
+      if (isOverflow(i, last.get(i), incs[i])) {
+        if (i == 0) {
+          throw new RangeOverflowException(range, last, incs[i].longValue());
+        }
+        long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
+        incs[i] = new BigDecimal(rem);
+        incs[i - 1] = incs[i-1].add(new BigDecimal(1));
+        overflowFlag[i] = true;
+      } else {
+        if (i > 0) {
+          incs[i] = value;
+          break;
+        }
+      }
+    }
+
+    for (int i = 0; i < incs.length; i++) {
+      if (incs[i] == null) {
+        incs[i] = new BigDecimal(0);
+      }
+    }
+
+    Tuple end = new VTuple(schema.getColumnNum());
+    Column column;
+    for (int i = 0; i < last.size(); i++) {
+      column = schema.getColumn(i);
+      switch (column.getDataType().getType()) {
+        case CHAR:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+          }
+          break;
+        case BIT:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createBit(
+                (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+          }
+          break;
+        case INT2:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt2(
+                (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+          }
+          break;
+        case INT4:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt4(
+                (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+          } else {
+            end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
+          }
+          break;
+        case INT8:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createInt8(
+                range.getStart().get(i).asInt4() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+          }
+          break;
+        case FLOAT4:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createFloat4(
+                range.getStart().get(i).asFloat4() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+          }
+          break;
+        case FLOAT8:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createFloat8(
+                range.getStart().get(i).asFloat8() + incs[i].longValue()));
+          } else {
+            end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+          }
+          break;
+        case TEXT:
+          if (overflowFlag[i]) {
+            end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
+                + incs[i].longValue())) + ""));
+          } else {
+            end.put(i, DatumFactory.createText(
+                ((char) (last.get(i).asChars().charAt(0) + incs[i].longValue())) + ""));
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
+      }
+    }
+
+    return end;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
new file mode 100644
index 0000000..c0e4017
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalOptimizer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.UnaryNode;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+
+public class GlobalOptimizer {
+
+  public GlobalOptimizer() {
+    
+  }
+  
+  public MasterPlan optimize(MasterPlan plan) {
+    ExecutionBlock reducedStep = reduceSchedules(plan.getRoot());
+
+    MasterPlan optimized = new MasterPlan(reducedStep);
+    optimized.setOutputTableName(plan.getOutputTable());
+
+    return optimized;
+  }
+  
+  @VisibleForTesting
+  private ExecutionBlock reduceSchedules(ExecutionBlock logicalUnit) {
+    reduceLogicalQueryUnitStep_(logicalUnit);
+    return logicalUnit;
+  }
+
+  private void reduceLogicalQueryUnitStep_(ExecutionBlock cur) {
+    if (cur.hasChildBlock()) {
+      for (ExecutionBlock childBlock: cur.getChildBlocks())
+        reduceLogicalQueryUnitStep_(childBlock);
+    }
+
+    for (ExecutionBlock childBlock: cur.getChildBlocks()) {
+      if (childBlock.getStoreTableNode().getSubNode().getType() != ExprType.UNION &&
+          childBlock.getPartitionType() == PartitionType.LIST) {
+        mergeLogicalUnits(cur, childBlock);
+      }
+    }
+  }
+  
+  private ExecutionBlock mergeLogicalUnits(ExecutionBlock parent, ExecutionBlock child) {
+    LogicalNode p = PlannerUtil.findTopParentNode(parent.getPlan(), ExprType.SCAN);
+
+    if (p instanceof UnaryNode) {
+      UnaryNode u = (UnaryNode) p;
+      ScanNode scan = (ScanNode) u.getSubNode();
+      LogicalNode c = child.getStoreTableNode().getSubNode();
+
+      parent.removeChildBlock(scan);
+      u.setSubNode(c);
+      parent.setPlan(parent.getPlan());
+      parent.addChildBlocks(child.getChildBlockMap());
+    }
+    return parent;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
new file mode 100644
index 0000000..9201997
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.master.ExecutionBlock;
+
+public class MasterPlan {
+  private ExecutionBlock root;
+  private String outputTableName;
+
+  public MasterPlan(ExecutionBlock root) {
+    setRoot(root);
+  }
+  
+  public void setRoot(ExecutionBlock root) {
+    this.root = root;
+  }
+  
+  public ExecutionBlock getRoot() {
+    return this.root;
+  }
+
+  public void setOutputTableName(String tableName) {
+    this.outputTableName = tableName;
+  }
+
+  public String getOutputTable() {
+    return outputTableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
new file mode 100644
index 0000000..061964d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+
+public abstract class BinaryNode extends LogicalNode implements Cloneable {
+	@Expose
+	LogicalNode outer = null;
+	@Expose
+	LogicalNode inner = null;
+	
+	public BinaryNode() {
+		super();
+	}
+	
+	/**
+	 * @param opType
+	 */
+	public BinaryNode(ExprType opType) {
+		super(opType);
+	}
+	
+	public LogicalNode getOuterNode() {
+		return this.outer;
+	}
+	
+	public void setOuter(LogicalNode op) {
+		this.outer = op;
+	}
+
+	public LogicalNode getInnerNode() {
+		return this.inner;
+	}
+
+	public void setInner(LogicalNode op) {
+		this.inner = op;
+	}
+	
+	@Override
+  public Object clone() throws CloneNotSupportedException {
+	  BinaryNode binNode = (BinaryNode) super.clone();
+	  binNode.outer = (LogicalNode) outer.clone();
+	  binNode.inner = (LogicalNode) inner.clone();
+	  
+	  return binNode;
+	}
+	
+	public void preOrder(LogicalNodeVisitor visitor) {
+	  visitor.visit(this);
+	  outer.postOrder(visitor);
+    inner.postOrder(visitor);    
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {
+    outer.postOrder(visitor);
+    inner.postOrder(visitor);
+    visitor.visit(this);
+  }
+
+  public String toJSON() {
+    outer.toJSON();
+    inner.toJSON();
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
new file mode 100644
index 0000000..a02f703
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.util.TUtil;
+
+public class CreateTableNode extends LogicalNode implements Cloneable {
+  @Expose private String tableName;
+  @Expose private Column[] partitionKeys;
+  @Expose private StoreType storageType;
+  @Expose private Schema schema;
+  @Expose private Path path;
+  @Expose private Options options;
+
+  public CreateTableNode(String tableName, Schema schema) {
+    super(ExprType.CREATE_TABLE);
+    this.tableName = tableName;
+    this.schema = schema;
+  }
+
+  public final String getTableName() {
+    return this.tableName;
+  }
+    
+  public Schema getSchema() {
+    return this.schema;
+  }
+
+  public void setStorageType(StoreType storageType) {
+    this.storageType = storageType;
+  }
+
+  public StoreType getStorageType() {
+    return this.storageType;
+  }
+
+  public boolean hasPath() {
+    return this.path != null;
+  }
+
+  public void setPath(Path path) {
+    this.path = path;
+  }
+  
+  public Path getPath() {
+    return this.path;
+  }
+  
+  public boolean hasOptions() {
+    return this.options != null;
+  }
+  
+  public void setOptions(Options opt) {
+    this.options = opt;
+  }
+  
+  public Options getOptions() {
+    return this.options;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof CreateTableNode) {
+      CreateTableNode other = (CreateTableNode) obj;
+      return super.equals(other)
+          && this.tableName.equals(other.tableName)
+          && this.schema.equals(other.schema)
+          && this.storageType == other.storageType
+          && this.path.equals(other.path)
+          && TUtil.checkEquals(options, other.options)
+          && TUtil.checkEquals(partitionKeys, other.partitionKeys);
+    } else {
+      return false;
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    CreateTableNode store = (CreateTableNode) super.clone();
+    store.tableName = tableName;
+    store.schema = (Schema) schema.clone();
+    store.storageType = storageType;
+    store.path = new Path(path.toString());
+    store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
+    store.options = (Options) (options != null ? options.clone() : null);
+    return store;
+  }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Store\": {\"table\": \""+tableName+"\",");
+    if (partitionKeys != null) {
+      sb.append("\"partition keys: [");
+      for (int i = 0; i < partitionKeys.length; i++) {
+        sb.append(partitionKeys[i]);
+        if (i < partitionKeys.length - 1)
+          sb.append(",");
+      }
+      sb.append("],");
+    }
+    sb.append("\"schema: \"{" + this.schema).append("}");
+    sb.append(",\"storeType\": \"" + this.storageType);
+    sb.append(",\"path\" : \"" + this.path).append("\",");
+    
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
+    .append("\n  \"in schema\": ").append(getInSchema())
+    .append("}");
+    
+    return sb.toString();
+  }
+  
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);    
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);    
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
new file mode 100644
index 0000000..c44db13
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+
+public class EvalExprNode extends LogicalNode {
+  @Expose private QueryBlock.Target[] exprs;
+
+  public EvalExprNode(QueryBlock.Target[] exprs) {
+    super(ExprType.EXPRS);
+    this.exprs = exprs;
+  }
+
+  @Override
+  public String toJSON() {
+    Gson gson = GsonCreator.getInstance();
+    return gson.toJson(this);
+  }
+  
+  public QueryBlock.Target[] getExprs() {
+    return this.exprs;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"EvalExpr\": {");
+    sb.append("\"targets\": [");
+
+    for (int i = 0; i < exprs.length; i++) {
+      sb.append("\"").append(exprs[i]).append("\"");
+      if( i < exprs.length - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append("],");
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
+    sb.append("\n  \"in schema\": ").append(getInSchema());
+    sb.append("}");
+    return sb.toString();
+  }
+  
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    // nothing
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    // nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
new file mode 100644
index 0000000..41e606f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.json.GsonCreator;
+
+public class ExceptNode extends BinaryNode {
+
+  public ExceptNode() {
+    super(ExprType.EXCEPT);
+  }
+
+  public ExceptNode(LogicalNode outer, LogicalNode inner) {
+    this();
+    setOuter(outer);
+    setInner(inner);
+  }
+
+  public String toString() {
+    return getOuterNode().toString() + "\n EXCEPT \n" + getInnerNode().toString();
+  }
+
+  @Override
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
new file mode 100644
index 0000000..8eb70cf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExprType.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+public enum ExprType {
+  BST_INDEX_SCAN,
+  CREATE_INDEX,
+  CREATE_TABLE,	
+	DESC_TABLE,
+	EXCEPT,
+	EXPRS,
+	GROUP_BY,
+	INSERT_INTO,
+	INTERSECT,
+  LIMIT,
+	JOIN,
+	PROJECTION,
+	RECEIVE,
+	RENAME,
+	ROOT,
+	SCAN,
+	SELECTION,
+	SEND,
+	SET_DIFF, 
+	SET_UNION,
+  SET_INTERSECT,
+	SHOW_TABLE,
+	SHOW_FUNCTION,
+	SORT,
+	STORE,
+	UNION
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
new file mode 100644
index 0000000..d35db32
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Arrays;
+
+public class GroupbyNode extends UnaryNode implements Cloneable {
+	@Expose
+	private Column[] columns;
+	@Expose
+	private EvalNode havingCondition = null;
+	@Expose
+	private QueryBlock.Target[] targets;
+	
+	public GroupbyNode() {
+		super();
+	}
+	
+	public GroupbyNode(final Column [] columns) {
+		super(ExprType.GROUP_BY);
+		this.columns = columns;
+	}
+	
+	public GroupbyNode(final Column [] columns, 
+	    final EvalNode havingCondition) {
+    this(columns);
+    this.havingCondition = havingCondition;
+  }
+	
+	public final Column [] getGroupingColumns() {
+	  return this.columns;
+	}
+	
+	public final boolean hasHavingCondition() {
+	  return this.havingCondition != null;
+	}
+	
+	public final EvalNode getHavingCondition() {
+	  return this.havingCondition;
+	}
+	
+	public final void setHavingCondition(final EvalNode evalTree) {
+	  this.havingCondition = evalTree;
+	}
+	
+  public boolean hasTargetList() {
+    return this.targets != null;
+  }
+
+  public QueryBlock.Target[] getTargets() {
+    return this.targets;
+  }
+
+  public void setTargetList(QueryBlock.Target[] targets) {
+    this.targets = targets;
+  }
+  
+  public void setSubNode(LogicalNode subNode) {
+    super.setSubNode(subNode);
+  }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder("\"GroupBy\": {\"fields\":[");
+    for (int i=0; i < columns.length; i++) {
+      sb.append("\"").append(columns[i]).append("\"");
+      if(i < columns.length - 1)
+        sb.append(",");
+    }
+    
+    if(hasHavingCondition()) {
+      sb.append("], \"having qual\": \""+havingCondition+"\"");
+    }
+    if(hasTargetList()) {
+      sb.append(", \"target\": [");
+      for (int i = 0; i < targets.length; i++) {
+        sb.append("\"").append(targets[i]).append("\"");
+        if( i < targets.length - 1) {
+          sb.append(",");
+        }
+      }
+      sb.append("],");
+    }
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
+    sb.append("\n  \"in schema\": ").append(getInSchema());
+    sb.append("}");
+    
+    return sb.toString() + "\n"
+        + getSubNode().toString();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof GroupbyNode) {
+      GroupbyNode other = (GroupbyNode) obj;
+      return super.equals(other) 
+          && Arrays.equals(columns, other.columns)
+          && TUtil.checkEquals(havingCondition, other.havingCondition)
+          && TUtil.checkEquals(targets, other.targets)
+          && getSubNode().equals(other.getSubNode());
+    } else {
+      return false;  
+    }
+  }
+  
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    GroupbyNode grp = (GroupbyNode) super.clone();
+    if (columns != null) {
+      grp.columns = new Column[columns.length];
+      for (int i = 0; i < columns.length; i++) {
+        grp.columns[i] = (Column) columns[i].clone();
+      }
+    }
+    grp.havingCondition = (EvalNode) (havingCondition != null 
+        ? havingCondition.clone() : null);    
+    if (targets != null) {
+      grp.targets = new QueryBlock.Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        grp.targets[i] = (QueryBlock.Target) targets[i].clone();
+      }
+    }
+
+    return grp;
+  }
+  
+  public String toJSON() {
+    return GsonCreator.getInstance().toJson(this, LogicalNode.class);
+  }
+}