You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/26 06:46:13 UTC

[14/33] TAJO-1125: Separate logical plan and optimizer into a maven module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
deleted file mode 100644
index 666c5fc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.Stack;
-
-public class PartitionedTableRewriter implements RewriteRule {
-  private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
-
-  private static final String NAME = "Partitioned Table Rewriter";
-  private final Rewriter rewriter = new Rewriter();
-
-  private final TajoConf systemConf;
-
-  public PartitionedTableRewriter(TajoConf conf) {
-    systemConf = conf;
-  }
-
-  @Override
-  public String getName() {
-    return NAME;
-  }
-
-  @Override
-  public boolean isEligible(LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      for (RelationNode relation : block.getRelations()) {
-        if (relation.getType() == NodeType.SCAN) {
-          TableDesc table = ((ScanNode)relation).getTableDesc();
-          if (table.hasPartition()) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
-    rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
-    return plan;
-  }
-
-  private static class PartitionPathFilter implements PathFilter {
-    private Schema schema;
-    private EvalNode partitionFilter;
-
-
-    public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
-      this.schema = schema;
-      this.partitionFilter = partitionFilter;
-    }
-
-    @Override
-    public boolean accept(Path path) {
-      Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
-      if (tuple == null) { // if it is a file or not acceptable file
-        return false;
-      }
-
-      return partitionFilter.eval(schema, tuple).asBool();
-    }
-
-    @Override
-    public String toString() {
-      return partitionFilter.toString();
-    }
-  }
-
-  /**
-   * It assumes that each conjunctive form corresponds to one column.
-   *
-   * @param partitionColumns
-   * @param conjunctiveForms search condition corresponding to partition columns.
-   *                         If it is NULL, it means that there is no search condition for this table.
-   * @param tablePath
-   * @return
-   * @throws IOException
-   */
-  private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
-      throws IOException {
-
-    FileSystem fs = tablePath.getFileSystem(systemConf);
-
-    PathFilter [] filters;
-    if (conjunctiveForms == null) {
-      filters = buildAllAcceptingPathFilters(partitionColumns);
-    } else {
-      filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
-    }
-
-    // loop from one to the number of partition columns
-    Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
-
-    for (int i = 1; i < partitionColumns.size(); i++) {
-      // Get all file status matched to a ith level path filter.
-      filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
-    }
-
-    LOG.info("Filtered directory or files: " + filteredPaths.length);
-    return filteredPaths;
-  }
-
-  /**
-   * Build path filters for all levels with a list of filter conditions.
-   *
-   * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
-   * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
-   *
-   * Corresponding filter conditions will be placed on each path filter,
-   * If there is no corresponding expression for certain column,
-   * The condition will be filled with a true value.
-   *
-   * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
-   * There is no filter condition corresponding to col2.
-   * Then, the path filter conditions are corresponding to the followings:
-   *
-   * The first path filter: col1 = 'A'
-   * The second path filter: col1 = 'A' AND col2 IS NOT NULL
-   * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
-   *
-   * 'IS NOT NULL' predicate is always true against the partition path.
-   *
-   * @param partitionColumns
-   * @param conjunctiveForms
-   * @return
-   */
-  private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
-                                                     EvalNode [] conjunctiveForms) {
-    // Building partition path filters for all levels
-    Column target;
-    PathFilter [] filters = new PathFilter[partitionColumns.size()];
-    List<EvalNode> accumulatedFilters = Lists.newArrayList();
-    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
-      target = partitionColumns.getColumn(i);
-
-      for (EvalNode expr : conjunctiveForms) {
-        if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
-          // Accumulate one qual per level
-          accumulatedFilters.add(expr);
-        }
-      }
-
-      if (accumulatedFilters.size() < (i + 1)) {
-        accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
-      }
-
-      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
-          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
-      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
-    }
-    return filters;
-  }
-
-  /**
-   * Build an array of path filters for all levels with all accepting filter condition.
-   * @param partitionColumns The partition columns schema
-   * @return The array of path filter, accpeting all partition paths.
-   */
-  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
-    Column target;
-    PathFilter [] filters = new PathFilter[partitionColumns.size()];
-    List<EvalNode> accumulatedFilters = Lists.newArrayList();
-    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
-      target = partitionColumns.getColumn(i);
-      accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
-
-      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
-          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
-      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
-    }
-    return filters;
-  }
-
-  private static Path [] toPathArray(FileStatus[] fileStatuses) {
-    Path [] paths = new Path[fileStatuses.length];
-    for (int j = 0; j < fileStatuses.length; j++) {
-      paths[j] = fileStatuses[j].getPath();
-    }
-    return paths;
-  }
-
-  private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
-    TableDesc table = scanNode.getTableDesc();
-    PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
-
-    Schema paritionValuesSchema = new Schema();
-    for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
-      paritionValuesSchema.addColumn(column);
-    }
-
-    Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
-
-    // if a query statement has a search condition, try to find indexable predicates
-    if (scanNode.hasQual()) {
-      EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
-      Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
-
-      // add qualifier to schema for qual
-      paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
-      for (Column column : paritionValuesSchema.getColumns()) {
-        for (EvalNode simpleExpr : conjunctiveForms) {
-          if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
-            indexablePredicateSet.add(simpleExpr);
-          }
-        }
-      }
-
-      // Partitions which are not matched to the partition filter conditions are pruned immediately.
-      // So, the partition filter conditions are not necessary later, and they are removed from
-      // original search condition for simplicity and efficiency.
-      remainExprs.removeAll(indexablePredicateSet);
-      if (remainExprs.isEmpty()) {
-        scanNode.setQual(null);
-      } else {
-        scanNode.setQual(
-            AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
-      }
-    }
-
-    if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
-      return findFilteredPaths(paritionValuesSchema,
-          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
-    } else { // otherwise, we will get all partition paths.
-      return findFilteredPaths(paritionValuesSchema, null, table.getPath());
-    }
-  }
-
-  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
-    if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
-      Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
-      // if it contains only single variable matched to a target column
-      return variables.size() == 1 && variables.contains(targetColumn);
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Check if an expression consists of one variable and one constant and
-   * the expression is a comparison operator.
-   *
-   * @param evalNode The expression to be checked
-   * @return true if an expression consists of one variable and one constant
-   * and the expression is a comparison operator. Other, false.
-   */
-  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
-    // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
-    return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
-  }
-
-  /**
-   *
-   * @param evalNode The expression to be checked
-   * @return true if an disjunctive expression, consisting of indexable expressions
-   */
-  private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
-    if (evalNode.getType() == EvalType.OR) {
-      BinaryEval orEval = (BinaryEval) evalNode;
-      boolean indexable =
-          checkIfIndexablePredicate(orEval.getLeftExpr()) &&
-              checkIfIndexablePredicate(orEval.getRightExpr());
-
-      boolean sameVariable =
-          EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr())
-          .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr()));
-
-      return indexable && sameVariable;
-    } else {
-      return false;
-    }
-  }
-
-  private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
-    if (scanNode.getInputPaths().length > 0) {
-      try {
-        FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
-        long totalVolume = 0;
-
-        for (Path input : scanNode.getInputPaths()) {
-          ContentSummary summary = fs.getContentSummary(input);
-          totalVolume += summary.getLength();
-          totalVolume += summary.getFileCount();
-        }
-        scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
-      } catch (IOException e) {
-        throw new PlanningException(e);
-      }
-    }
-  }
-
-  private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
-    @Override
-    public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
-                            Stack<LogicalNode> stack) throws PlanningException {
-
-      TableDesc table = scanNode.getTableDesc();
-      if (!table.hasPartition()) {
-        return null;
-      }
-
-      try {
-        Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
-        plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
-        PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
-        rewrittenScanNode.init(scanNode, filteredPaths);
-        updateTableStat(rewrittenScanNode);
-
-        // if it is topmost node, set it as the rootnode of this block.
-        if (stack.empty() || block.getRoot().equals(scanNode)) {
-          block.setRoot(rewrittenScanNode);
-        } else {
-          PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
-        }
-      } catch (IOException e) {
-        throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
-      }
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
deleted file mode 100644
index ec5df04..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ /dev/null
@@ -1,1145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import com.google.common.collect.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * ProjectionPushDownRule deploys expressions in a selection list to proper
- * {@link org.apache.tajo.engine.planner.logical.Projectable}
- * nodes. In this process, the expressions are usually pushed down into as lower as possible.
- * It also enables scanners to read only necessary columns.
- */
-public class ProjectionPushDownRule extends
-    BasicLogicalPlanVisitor<ProjectionPushDownRule.Context, LogicalNode> implements RewriteRule {
-  /** Class Logger */
-  private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class);
-  private static final String name = "ProjectionPushDown";
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public boolean isEligible(LogicalPlan plan) {
-    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
-
-    if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) {
-      return false;
-    }
-    for (QueryBlock eachBlock: plan.getQueryBlocks()) {
-      if (eachBlock.hasTableExpression()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
-
-    LogicalPlan.QueryBlock topmostBlock = rootBlock;
-
-    Stack<LogicalNode> stack = new Stack<LogicalNode>();
-    Context context = new Context(plan);
-    visit(context, plan, topmostBlock, topmostBlock.getRoot(), stack);
-
-    return plan;
-  }
-
-  /**
-   * <h2>What is TargetListManager?</h2>
-   * It manages all expressions used in a query block, and their reference names.
-   * TargetListManager provides a way to find an expression by a reference name.
-   * It keeps a set of expressions, and one or more reference names can point to
-   * the same expression.
-   *
-   * Also, TargetListManager keeps the evaluation state of each expression.
-   * The evaluation state is a boolean state to indicate whether the expression
-   * was evaluated in descendant node or not. If an expression is evaluated,
-   * the evaluation state is changed to TRUE. It also means that
-   * the expression can be referred by an column reference instead of evaluating the expression.
-   *
-   * Consider an example query:
-   *
-   * SELECT sum(l_orderkey + 1) from lineitem where l_partkey > 1;
-   *
-   * In this case, an expression sum(l_orderkey + 1) is divided into two sub expressions:
-   * <ul>
-   *  <li>$1 <- l_orderkey + 1</li>
-   *  <li>$2 <- sum($1)</li>
-   * </ul>
-   *
-   * <code>$1</code> is a simple arithmetic operation, and $2 is an aggregation function.
-   * <code>$1</code> is evaluated in ScanNode because it's just a simple arithmetic operation.
-   * So, the evaluation state of l_orderkey + 1 initially
-   * is false, but the state will be true after ScanNode.
-   *
-   * In contrast, sum($1) is evaluated at GroupbyNode. So, its evaluation state is changed
-   * after GroupByNode.
-   *
-   * <h2>Why is TargetListManager necessary?</h2>
-   *
-   * Expressions used in a query block can be divided into various categories according to
-   * the possible {@link Projectable} nodes. Their references become available depending on
-   * the Projectable node at which expressions are evaluated. It manages the expressions and
-   * references for optimized places of expressions. It performs duplicated removal and enables
-   * common expressions to be shared with two or more Projectable nodes. It also helps Projectable
-   * nodes to find correct column references.
-   */
-  public static class TargetListManager {
-    private Integer seqId = 0;
-
-    /**
-     * Why should we use LinkedHashMap for those maps ?
-     *
-     * These maps are mainly by the target list of each projectable node
-     * (i.e., ProjectionNode, GroupbyNode, JoinNode, and ScanNode).
-     * The projection node removal occurs only when the projection node's output
-     * schema and its child's output schema are equivalent to each other.
-     *
-     * If we keep the inserted order of all expressions. It would make the possibility
-     * of projection node removal higher.
-     **/
-
-    /** A Map: Name -> Id */
-    private LinkedHashMap<String, Integer> nameToIdBiMap;
-    /** Map: Id <-> EvalNode */
-    private BiMap<Integer, EvalNode> idToEvalBiMap;
-    /** Map: Id -> Names */
-    private LinkedHashMap<Integer, List<String>> idToNamesMap;
-    /** Map: Id -> Boolean */
-    private LinkedHashMap<Integer, Boolean> evaluationStateMap;
-    /** Map: alias name -> Id */
-    private LinkedHashMap<String, Integer> aliasMap;
-
-    private LogicalPlan plan;
-
-    public TargetListManager(LogicalPlan plan) {
-      this.plan = plan;
-      nameToIdBiMap = Maps.newLinkedHashMap();
-      idToEvalBiMap = HashBiMap.create();
-      idToNamesMap = Maps.newLinkedHashMap();
-      evaluationStateMap = Maps.newLinkedHashMap();
-      aliasMap = Maps.newLinkedHashMap();
-    }
-
-    private int getNextSeqId() {
-      return seqId++;
-    }
-
-    /**
-     * If some expression is duplicated, we call an alias indicating the duplicated expression 'native alias'.
-     * This method checks whether a reference is native alias or not.
-     *
-     * @param name The reference name
-     * @return True if the reference is native alias. Otherwise, it will return False.
-     */
-    public boolean isNativeAlias(String name) {
-      return aliasMap.containsKey(name);
-    }
-
-    /**
-     * This method retrieves the name indicating actual expression that an given alias indicate.
-     *
-     * @param name an alias name
-     * @return Real reference name
-     */
-    public String getRealReferenceName(String name) {
-      int refId = aliasMap.get(name);
-      return getPrimaryName(refId);
-    }
-
-    /**
-     * Add an expression with a specified name, which is usually an alias.
-     * Later, you can refer this expression by the specified name.
-     */
-    private String add(String specifiedName, EvalNode evalNode) throws PlanningException {
-
-      // if a name already exists, it only just keeps an actual
-      // expression instead of a column reference.
-      if (nameToIdBiMap.containsKey(specifiedName)) {
-
-        int refId = nameToIdBiMap.get(specifiedName);
-        EvalNode found = idToEvalBiMap.get(refId);
-        if (found != null) {
-          if (evalNode.equals(found)) { // if input expression already exists
-            return specifiedName;
-          } else {
-            // The case where if existing reference name and a given reference name are the same to each other and
-            // existing EvalNode and a given EvalNode is the different
-            if (found.getType() != EvalType.FIELD && evalNode.getType() != EvalType.FIELD) {
-              throw new PlanningException("Duplicate alias: " + evalNode);
-            }
-
-            if (found.getType() == EvalType.FIELD) {
-              Integer daggling = idToEvalBiMap.inverse().get(evalNode);
-              idToEvalBiMap.forcePut(refId, evalNode);
-              if (daggling != null) {
-                String name = getPrimaryName(daggling);
-                idToNamesMap.remove(daggling);
-                nameToIdBiMap.put(name, refId);
-                if (!idToNamesMap.get(refId).contains(name)) {
-                  TUtil.putToNestedList(idToNamesMap, refId, name);
-                }
-              }
-            }
-          }
-        }
-      }
-
-      int refId;
-      if (idToEvalBiMap.inverse().containsKey(evalNode)) {
-        refId = idToEvalBiMap.inverse().get(evalNode);
-        aliasMap.put(specifiedName, refId);
-
-      } else {
-        refId = getNextSeqId();
-        idToEvalBiMap.put(refId, evalNode);
-        TUtil.putToNestedList(idToNamesMap, refId, specifiedName);
-        for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
-          add(new FieldEval(column));
-        }
-        evaluationStateMap.put(refId, false);
-      }
-
-      nameToIdBiMap.put(specifiedName, refId);
-
-      return specifiedName;
-    }
-
-    /**
-     * Adds an expression without any name. It returns an automatically
-     * generated name. It can be also used for referring this expression.
-     */
-    public String add(EvalNode evalNode) throws PlanningException {
-      String name;
-
-      if (evalNode.getType() == EvalType.FIELD) {
-        FieldEval fieldEval = (FieldEval) evalNode;
-        if (nameToIdBiMap.containsKey(fieldEval.getName())) {
-          int refId = nameToIdBiMap.get(fieldEval.getName());
-          return getPrimaryName(refId);
-        }
-      }
-
-      if (idToEvalBiMap.inverse().containsKey(evalNode)) {
-        int refId = idToEvalBiMap.inverse().get(evalNode);
-        return getPrimaryName(refId);
-      }
-
-      if (evalNode.getType() == EvalType.FIELD) {
-        FieldEval fieldEval = (FieldEval) evalNode;
-        name = fieldEval.getName();
-      } else {
-        name = plan.generateUniqueColumnName(evalNode);
-      }
-
-      return add(name, evalNode);
-    }
-
-    public Collection<String> getNames() {
-      return nameToIdBiMap.keySet();
-    }
-
-    public String add(Target target) throws PlanningException {
-      return add(target.getCanonicalName(), target.getEvalTree());
-    }
-
-    /**
-     * Each expression can have one or more names.
-     * We call a name added with an expression firstly as the primary name.
-     * It has a special meaning. Since duplicated expression in logical planning are removed,
-     * the primary name is only used for each expression during logical planning.
-     *
-     * @param refId The identifier of an expression
-     * @param name The name to check if it is the primary name.
-     * @return True if this name is the primary added name. Otherwise, False.
-     */
-    private boolean isPrimaryName(int refId, String name) {
-      if (idToNamesMap.get(refId).size() > 0) {
-        return getPrimaryName(refId).equals(name);
-      } else {
-        return false;
-      }
-    }
-
-    private String getPrimaryName(int refId) {
-      return idToNamesMap.get(refId).get(0);
-    }
-
-    public Target getTarget(String name) {
-      if (!nameToIdBiMap.containsKey(name)) {
-        throw new RuntimeException("No Such target name: " + name);
-      }
-      int id = nameToIdBiMap.get(name);
-      EvalNode evalNode = idToEvalBiMap.get(id);
-
-      // if it is a constant value, just returns a constant because it can be evaluated everywhere.
-      if (evalNode.getType() == EvalType.CONST) {
-        return new Target(evalNode, name);
-      }
-
-      // if a name is not the primary name, it means that its expression may be already evaluated and
-      // can just refer a value. Consider an example as follows:
-      //
-      // select l_orderkey + 1 as total1, l_orderkey + 1 as total2 from lineitem
-      //
-      // In this case, total2 will meet the following condition. Then, total2 can
-      // just refer the result of total1 rather than calculating l_orderkey + 1.
-      if (!isPrimaryName(id, name) && isEvaluated(getPrimaryName(id))) {
-        evalNode = new FieldEval(getPrimaryName(id), evalNode.getValueType());
-      }
-
-      // if it is a column reference itself, just returns a column reference without any alias.
-      if (evalNode.getType() == EvalType.FIELD && evalNode.getName().equals(name)) {
-        return new Target((FieldEval)evalNode);
-      } else { // otherwise, it returns an expression.
-        return new Target(evalNode, name);
-      }
-    }
-
-    public boolean isEvaluated(String name) {
-      if (!nameToIdBiMap.containsKey(name)) {
-        throw new RuntimeException("No Such target name: " + name);
-      }
-      int refId = nameToIdBiMap.get(name);
-      return evaluationStateMap.get(refId);
-    }
-
-    public void markAsEvaluated(Target target) {
-      int refId = nameToIdBiMap.get(target.getCanonicalName());
-      EvalNode evalNode = target.getEvalTree();
-      if (!idToNamesMap.containsKey(refId)) {
-        throw new RuntimeException("No such eval: " + evalNode);
-      }
-      evaluationStateMap.put(refId, true);
-    }
-
-    public Iterator<Target> getFilteredTargets(Set<String> required) {
-      return new FilteredTargetIterator(required);
-    }
-
-    class FilteredTargetIterator implements Iterator<Target> {
-      List<Target> filtered = TUtil.newList();
-      Iterator<Target> iterator;
-
-      public FilteredTargetIterator(Set<String> required) {
-        for (String name : nameToIdBiMap.keySet()) {
-          if (required.contains(name)) {
-            filtered.add(getTarget(name));
-          }
-        }
-        iterator = filtered.iterator();
-      }
-
-      @Override
-      public boolean hasNext() {
-        return iterator.hasNext();
-      }
-
-      @Override
-      public Target next() {
-        return iterator.next();
-      }
-
-      @Override
-      public void remove() {
-      }
-    }
-
-    public String toString() {
-      int evaluated = 0;
-      for (Boolean flag: evaluationStateMap.values()) {
-        if (flag) {
-          evaluated++;
-        }
-      }
-      return "eval=" + evaluationStateMap.size() + ", evaluated=" + evaluated;
-    }
-  }
-
-  static class Context {
-    TargetListManager targetListMgr;
-    Set<String> requiredSet;
-
-    public Context(LogicalPlan plan) {
-      requiredSet = new LinkedHashSet<String>();
-      targetListMgr = new TargetListManager(plan);
-    }
-
-    public Context(LogicalPlan plan, Collection<String> requiredSet) {
-      this.requiredSet = new LinkedHashSet<String>(requiredSet);
-      targetListMgr = new TargetListManager(plan);
-    }
-
-    public Context(Context upperContext) {
-      this.requiredSet = new LinkedHashSet<String>(upperContext.requiredSet);
-      targetListMgr = upperContext.targetListMgr;
-    }
-
-    public String addExpr(Target target) throws PlanningException {
-      String reference = targetListMgr.add(target);
-      addNecessaryReferences(target.getEvalTree());
-      return reference;
-    }
-
-    public String addExpr(EvalNode evalNode) throws PlanningException {
-      String reference = targetListMgr.add(evalNode);
-      addNecessaryReferences(evalNode);
-      return reference;
-    }
-
-    private void addNecessaryReferences(EvalNode evalNode) {
-      for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
-        requiredSet.add(column.getQualifiedName());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "required=" + requiredSet.size() + "," + targetListMgr.toString();
-    }
-  }
-
-  @Override
-  public LogicalNode visitRoot(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node,
-                          Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode child = super.visitRoot(context, plan, block, node, stack);
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitProjection(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                     ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-    Target [] targets = node.getTargets();
-    int targetNum = targets.length;
-    String [] referenceNames = new String[targetNum];
-    for (int i = 0; i < targetNum; i++) {
-      referenceNames[i] = newContext.addExpr(targets[i]);
-    }
-
-    LogicalNode child = super.visitProjection(newContext, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-
-    int evaluationCount = 0;
-    List<Target> finalTargets = TUtil.newList();
-    for (String referenceName : referenceNames) {
-      Target target = context.targetListMgr.getTarget(referenceName);
-
-      if (target.getEvalTree().getType() == EvalType.CONST) {
-        finalTargets.add(target);
-      } else if (context.targetListMgr.isEvaluated(referenceName)) {
-        if (context.targetListMgr.isNativeAlias(referenceName)) {
-          String realRefName = context.targetListMgr.getRealReferenceName(referenceName);
-          finalTargets.add(new Target(new FieldEval(realRefName, target.getDataType()), referenceName));
-        } else {
-          finalTargets.add(new Target(new FieldEval(target.getNamedColumn())));
-        }
-      } else if (LogicalPlanner.checkIfBeEvaluatedAtThis(target.getEvalTree(), node)) {
-        finalTargets.add(target);
-        context.targetListMgr.markAsEvaluated(target);
-        evaluationCount++;
-      }
-    }
-
-    node.setTargets(finalTargets.toArray(new Target[finalTargets.size()]));
-    LogicalPlanner.verifyProjectedFields(block, node);
-
-    // Removing ProjectionNode
-    // TODO - Consider INSERT and CTAS statement, and then remove the check of stack.empty.
-    if (evaluationCount == 0 && PlannerUtil.targetToSchema(finalTargets).equals(child.getOutSchema())) {
-      if (stack.empty()) {
-        // if it is topmost, set it as the root of this block.
-        block.setRoot(child);
-      } else {
-        LogicalNode parentNode = stack.peek();
-        switch (parentNode.getType()) {
-        case ROOT:
-          LogicalRootNode rootNode = (LogicalRootNode) parentNode;
-          rootNode.setChild(child);
-          rootNode.setInSchema(child.getOutSchema());
-          rootNode.setOutSchema(child.getOutSchema());
-          break;
-        case TABLE_SUBQUERY:
-          TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) parentNode;
-          tableSubQueryNode.setSubQuery(child);
-          break;
-        case STORE:
-          StoreTableNode storeTableNode = (StoreTableNode) parentNode;
-          storeTableNode.setChild(child);
-          storeTableNode.setInSchema(child.getOutSchema());
-          break;
-        case INSERT:
-          InsertNode insertNode = (InsertNode) parentNode;
-          insertNode.setSubQuery(child);
-          break;
-        case CREATE_TABLE:
-          CreateTableNode createTableNode = (CreateTableNode) parentNode;
-          createTableNode.setChild(child);
-          createTableNode.setInSchema(child.getOutSchema());
-          break;
-        default:
-          throw new PlanningException("Unexpected Parent Node: " + parentNode.getType());
-        }
-        plan.addHistory("ProjectionNode is eliminated.");
-      }
-
-      return child;
-
-    } else {
-      return node;
-    }
-  }
-
-  public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node,
-                           Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode child = super.visitLimit(context, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitSort(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                               SortNode node, Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-
-    final int sortKeyNum = node.getSortKeys().length;
-    String [] keyNames = new String[sortKeyNum];
-    for (int i = 0; i < sortKeyNum; i++) {
-      SortSpec sortSpec = node.getSortKeys()[i];
-      keyNames[i] = newContext.addExpr(new FieldEval(sortSpec.getSortKey()));
-    }
-
-    LogicalNode child = super.visitSort(newContext, plan, block, node, stack);
-
-    // it rewrite sortkeys. This rewrite sets right column names and eliminates duplicated sort keys.
-    List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
-    for (int i = 0; i < keyNames.length; i++) {
-      String sortKey = keyNames[i];
-      Target target = context.targetListMgr.getTarget(sortKey);
-      if (context.targetListMgr.isEvaluated(sortKey)) {
-        Column c = target.getNamedColumn();
-        SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
-        if (!sortSpecs.contains(sortSpec)) {
-          sortSpecs.add(sortSpec);
-        }
-      } else {
-        if (target.getEvalTree().getType() == EvalType.FIELD) {
-          Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
-          SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
-          if (!sortSpecs.contains(sortSpec)) {
-            sortSpecs.add(sortSpec);
-          }
-        }
-      }
-    }
-    node.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()]));
-
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitHaving(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
-                            Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-    String referenceName = newContext.targetListMgr.add(node.getQual());
-    newContext.addNecessaryReferences(node.getQual());
-
-    LogicalNode child = super.visitHaving(newContext, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-
-    Target target = context.targetListMgr.getTarget(referenceName);
-    if (newContext.targetListMgr.isEvaluated(referenceName)) {
-      node.setQual(new FieldEval(target.getNamedColumn()));
-    } else {
-      node.setQual(target.getEvalTree());
-      newContext.targetListMgr.markAsEvaluated(target);
-    }
-
-    return node;
-  }
-
-  public LogicalNode visitWindowAgg(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
-                        Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-
-    if (node.hasPartitionKeys()) {
-      for (Column c : node.getPartitionKeys()) {
-        newContext.addNecessaryReferences(new FieldEval(c));
-      }
-    }
-
-    if (node.hasSortSpecs()) {
-      for (SortSpec sortSpec : node.getSortSpecs()) {
-        newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey()));
-      }
-    }
-
-    for (WindowFunctionEval winFunc : node.getWindowFunctions()) {
-      if (winFunc.hasSortSpecs()) {
-        for (SortSpec sortSpec : winFunc.getSortSpecs()) {
-          newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey()));
-        }
-      }
-    }
-
-
-    int nonFunctionColumnNum = node.getTargets().length - node.getWindowFunctions().length;
-    LinkedHashSet<String> nonFunctionColumns = Sets.newLinkedHashSet();
-    for (int i = 0; i < nonFunctionColumnNum; i++) {
-      FieldEval fieldEval = (new FieldEval(node.getTargets()[i].getNamedColumn()));
-      nonFunctionColumns.add(newContext.addExpr(fieldEval));
-    }
-
-    final String [] aggEvalNames;
-    if (node.hasAggFunctions()) {
-      final int evalNum = node.getWindowFunctions().length;
-      aggEvalNames = new String[evalNum];
-      for (int evalIdx = 0, targetIdx = nonFunctionColumnNum; targetIdx < node.getTargets().length; evalIdx++,
-          targetIdx++) {
-        Target target = node.getTargets()[targetIdx];
-        WindowFunctionEval winFunc = node.getWindowFunctions()[evalIdx];
-        aggEvalNames[evalIdx] = newContext.addExpr(new Target(winFunc, target.getCanonicalName()));
-      }
-    } else {
-      aggEvalNames = null;
-    }
-
-    // visit a child node
-    LogicalNode child = super.visitWindowAgg(newContext, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-
-    List<Target> targets = Lists.newArrayList();
-    if (nonFunctionColumnNum > 0) {
-      for (String column : nonFunctionColumns) {
-        Target target = context.targetListMgr.getTarget(column);
-
-        // it rewrite grouping keys.
-        // This rewrite sets right column names and eliminates duplicated grouping keys.
-        if (context.targetListMgr.isEvaluated(column)) {
-          targets.add(new Target(new FieldEval(target.getNamedColumn())));
-        } else {
-          if (target.getEvalTree().getType() == EvalType.FIELD) {
-           targets.add(target);
-          }
-        }
-      }
-    }
-
-    // Getting projected targets
-    if (node.hasAggFunctions() && aggEvalNames != null) {
-      WindowFunctionEval [] aggEvals = new WindowFunctionEval[aggEvalNames.length];
-      int i = 0;
-      for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
-
-        String referenceName = it.next();
-        Target target = context.targetListMgr.getTarget(referenceName);
-
-        if (LogicalPlanner.checkIfBeEvaluatedAtWindowAgg(target.getEvalTree(), node)) {
-          aggEvals[i++] = target.getEvalTree();
-          context.targetListMgr.markAsEvaluated(target);
-
-          targets.add(new Target(new FieldEval(target.getNamedColumn())));
-        }
-      }
-      if (aggEvals.length > 0) {
-        node.setWindowFunctions(aggEvals);
-      }
-    }
-
-    node.setTargets(targets.toArray(new Target[targets.size()]));
-    return node;
-  }
-
-  public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
-                             Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-
-    // Getting grouping key names
-    final int groupingKeyNum = node.getGroupingColumns().length;
-    LinkedHashSet<String> groupingKeyNames = null;
-    if (groupingKeyNum > 0) {
-      groupingKeyNames = Sets.newLinkedHashSet();
-      for (int i = 0; i < groupingKeyNum; i++) {
-        FieldEval fieldEval = new FieldEval(node.getGroupingColumns()[i]);
-        groupingKeyNames.add(newContext.addExpr(fieldEval));
-      }
-    }
-
-    // Getting eval names
-
-    final String [] aggEvalNames;
-    if (node.hasAggFunctions()) {
-      final int evalNum = node.getAggFunctions().length;
-      aggEvalNames = new String[evalNum];
-      for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) {
-        Target target = node.getTargets()[targetIdx];
-        EvalNode evalNode = node.getAggFunctions()[evalIdx];
-        aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName()));
-      }
-    } else {
-      aggEvalNames = null;
-    }
-
-    // visit a child node
-    LogicalNode child = super.visitGroupBy(newContext, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-
-    List<Target> targets = Lists.newArrayList();
-    if (groupingKeyNum > 0 && groupingKeyNames != null) {
-      // Restoring grouping key columns
-      final List<Column> groupingColumns = new ArrayList<Column>();
-      for (String groupingKey : groupingKeyNames) {
-        Target target = context.targetListMgr.getTarget(groupingKey);
-
-        // it rewrite grouping keys.
-        // This rewrite sets right column names and eliminates duplicated grouping keys.
-        if (context.targetListMgr.isEvaluated(groupingKey)) {
-          Column c = target.getNamedColumn();
-          if (!groupingColumns.contains(c)) {
-            groupingColumns.add(c);
-            targets.add(new Target(new FieldEval(target.getNamedColumn())));
-          }
-        } else {
-          if (target.getEvalTree().getType() == EvalType.FIELD) {
-            Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
-            if (!groupingColumns.contains(c)) {
-              groupingColumns.add(c);
-              targets.add(target);
-              context.targetListMgr.markAsEvaluated(target);
-            }
-          } else {
-            throw new PlanningException("Cannot evaluate this expression in grouping keys: " + target.getEvalTree());
-          }
-        }
-      }
-
-      node.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()]));
-    }
-
-    // Getting projected targets
-    if (node.hasAggFunctions() && aggEvalNames != null) {
-      AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length];
-      int i = 0;
-      for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
-
-        String referenceName = it.next();
-        Target target = context.targetListMgr.getTarget(referenceName);
-
-        if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
-          aggEvals[i++] = target.getEvalTree();
-          context.targetListMgr.markAsEvaluated(target);
-        }
-      }
-      if (aggEvals.length > 0) {
-        node.setAggFunctions(aggEvals);
-      }
-    }
-    Target [] finalTargets = buildGroupByTarget(node, targets, aggEvalNames);
-    node.setTargets(finalTargets);
-
-    LogicalPlanner.verifyProjectedFields(block, node);
-
-    return node;
-  }
-
-  public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, @Nullable List<Target> groupingKeyTargets,
-                                             String [] aggEvalNames) {
-    final int groupingKeyNum =
-        groupingKeyTargets == null ? groupbyNode.getGroupingColumns().length : groupingKeyTargets.size();
-    final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0;
-    EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions();
-    Target [] targets = new Target[groupingKeyNum + aggrFuncNum];
-
-    if (groupingKeyTargets != null) {
-      for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
-        targets[groupingKeyIdx] = groupingKeyTargets.get(groupingKeyIdx);
-      }
-    } else {
-      for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
-        targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx]));
-      }
-    }
-
-    if (aggEvalNames != null) {
-      for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) {
-        targets[targetIdx] =
-            new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType()));
-      }
-    }
-
-    return targets;
-  }
-
-  public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                 SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-    String referenceName = newContext.targetListMgr.add(node.getQual());
-    newContext.addNecessaryReferences(node.getQual());
-
-    LogicalNode child = super.visitFilter(newContext, plan, block, node, stack);
-
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-
-    Target target = context.targetListMgr.getTarget(referenceName);
-    if (newContext.targetListMgr.isEvaluated(referenceName)) {
-      node.setQual(new FieldEval(target.getNamedColumn()));
-    } else {
-      node.setQual(target.getEvalTree());
-      newContext.targetListMgr.markAsEvaluated(target);
-    }
-
-    return node;
-  }
-
-  private static void pushDownIfComplexTermInJoinCondition(Context ctx, EvalNode cnf, EvalNode term)
-      throws PlanningException {
-
-    // If one of both terms in a binary operator is a complex expression, the binary operator will require
-    // multiple phases. In this case, join cannot evaluate a binary operator.
-    // So, we should prevent dividing the binary operator into more subexpressions.
-    if (term.getType() != EvalType.FIELD &&
-        !(term instanceof BinaryEval) &&
-        !(term.getType() == EvalType.ROW_CONSTANT)) {
-      String refName = ctx.addExpr(term);
-      EvalTreeUtil.replace(cnf, term, new FieldEval(refName, term.getValueType()));
-    }
-  }
-
-  public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
-                          Stack<LogicalNode> stack) throws PlanningException {
-    Context newContext = new Context(context);
-
-    String joinQualReference = null;
-    if (node.hasJoinQual()) {
-      for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(node.getJoinQual())) {
-        if (eachQual instanceof BinaryEval) {
-          BinaryEval binaryQual = (BinaryEval) eachQual;
-
-          for (int i = 0; i < 2; i++) {
-            EvalNode term = binaryQual.getChild(i);
-            pushDownIfComplexTermInJoinCondition(newContext, eachQual, term);
-          }
-        }
-      }
-
-      joinQualReference = newContext.addExpr(node.getJoinQual());
-      newContext.addNecessaryReferences(node.getJoinQual());
-    }
-
-    String [] referenceNames = null;
-    if (node.hasTargets()) {
-      referenceNames = new String[node.getTargets().length];
-      int i = 0;
-      for (Iterator<Target> it = getFilteredTarget(node.getTargets(), context.requiredSet); it.hasNext();) {
-        Target target = it.next();
-        referenceNames[i++] = newContext.addExpr(target);
-      }
-    }
-
-    stack.push(node);
-    LogicalNode left = visit(newContext, plan, block, node.getLeftChild(), stack);
-    LogicalNode right = visit(newContext, plan, block, node.getRightChild(), stack);
-    stack.pop();
-
-    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
-
-    node.setInSchema(merged);
-
-    if (node.hasJoinQual()) {
-      Target target = context.targetListMgr.getTarget(joinQualReference);
-      if (newContext.targetListMgr.isEvaluated(joinQualReference)) {
-        throw new PlanningException("Join condition must be evaluated in the proper Join Node: " + joinQualReference);
-      } else {
-        node.setJoinQual(target.getEvalTree());
-        newContext.targetListMgr.markAsEvaluated(target);
-      }
-    }
-
-    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
-    for (Iterator<String> it = getFilteredReferences(context.targetListMgr.getNames(),
-        context.requiredSet); it.hasNext();) {
-      String referenceName = it.next();
-      Target target = context.targetListMgr.getTarget(referenceName);
-
-      if (context.targetListMgr.isEvaluated(referenceName)) {
-        Target fieldReference = new Target(new FieldEval(target.getNamedColumn()));
-        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, fieldReference.getEvalTree(), node,
-            stack.peek().getType() != NodeType.JOIN)) {
-          projectedTargets.add(fieldReference);
-        }
-      } else if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, target.getEvalTree(), node,
-          stack.peek().getType() != NodeType.JOIN)) {
-        projectedTargets.add(target);
-        context.targetListMgr.markAsEvaluated(target);
-      }
-    }
-
-    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
-    LogicalPlanner.verifyProjectedFields(block, node);
-    return node;
-  }
-
-  static Iterator<String> getFilteredReferences(Collection<String> targetNames, Set<String> required) {
-    return new FilteredStringsIterator(targetNames, required);
-  }
-
-  static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) {
-    return new FilteredStringsIterator(targetNames, required);
-  }
-
-  static class FilteredStringsIterator implements Iterator<String> {
-    Iterator<String> iterator;
-
-    FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) {
-      List<String> filtered = TUtil.newList();
-      for (String name : targetNames) {
-        if (required.contains(name)) {
-          filtered.add(name);
-        }
-      }
-
-      iterator = filtered.iterator();
-    }
-
-    FilteredStringsIterator(String [] targetNames, Collection<String> required) {
-      this(TUtil.newList(targetNames), required);
-    }
-
-    @Override
-    public boolean hasNext() {
-      return iterator.hasNext();
-    }
-
-    @Override
-    public String next() {
-      return iterator.next();
-    }
-
-    @Override
-    public void remove() {
-    }
-  }
-
-  static Iterator<Target> getFilteredTarget(Target[] targets, Set<String> required) {
-    return new FilteredIterator(targets, required);
-  }
-
-  static class FilteredIterator implements Iterator<Target> {
-    Iterator<Target> iterator;
-
-    FilteredIterator(Target [] targets, Set<String> requiredReferences) {
-      List<Target> filtered = TUtil.newList();
-      Map<String, Target> targetSet = new HashMap<String, Target>();
-      for (Target t : targets) {
-        // Only should keep an raw target instead of field reference.
-        if (targetSet.containsKey(t.getCanonicalName())) {
-          Target targetInSet = targetSet.get(t.getCanonicalName());
-          EvalNode evalNode = targetInSet.getEvalTree();
-          if (evalNode.getType() == EvalType.FIELD && t.getEvalTree().getType() != EvalType.FIELD) {
-            targetSet.put(t.getCanonicalName(), t);
-          }
-        } else {
-          targetSet.put(t.getCanonicalName(), t);
-        }
-      }
-
-      for (String name : requiredReferences) {
-        if (targetSet.containsKey(name)) {
-          filtered.add(targetSet.get(name));
-        }
-      }
-
-      iterator = filtered.iterator();
-    }
-    @Override
-    public boolean hasNext() {
-      return iterator.hasNext();
-    }
-
-    @Override
-    public Target next() {
-      return iterator.next();
-    }
-
-    @Override
-    public void remove() {
-    }
-  }
-
-  @Override
-  public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
-                           Stack<LogicalNode> stack) throws PlanningException {
-
-    LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
-    LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
-
-    Context leftContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
-        leftBlock.getName()));
-    Context rightContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
-        rightBlock.getName()));
-
-    stack.push(node);
-    visit(leftContext, plan, leftBlock, leftBlock.getRoot(), new Stack<LogicalNode>());
-    visit(rightContext, plan, rightBlock, rightBlock.getRoot(), new Stack<LogicalNode>());
-    stack.pop();
-    return node;
-  }
-
-  public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
-                          Stack<LogicalNode> stack) throws PlanningException {
-
-    Context newContext = new Context(context);
-
-    Target [] targets;
-    if (node.hasTargets()) {
-      targets = node.getTargets();
-    } else {
-      targets = PlannerUtil.schemaToTargets(node.getTableSchema());
-    }
-
-    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
-    for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-      newContext.addExpr(target);
-    }
-
-    for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-
-      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
-        projectedTargets.add(target);
-        newContext.targetListMgr.markAsEvaluated(target);
-      }
-    }
-
-    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
-    LogicalPlanner.verifyProjectedFields(block, node);
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitPartitionedTableScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                               PartitionedTableScanNode node, Stack<LogicalNode> stack)
-      throws PlanningException {
-
-    Context newContext = new Context(context);
-
-    Target [] targets;
-    if (node.hasTargets()) {
-      targets = node.getTargets();
-    } else {
-      targets = PlannerUtil.schemaToTargets(node.getOutSchema());
-    }
-
-    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
-    for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-      newContext.addExpr(target);
-    }
-
-    for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-
-      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
-        projectedTargets.add(target);
-        newContext.targetListMgr.markAsEvaluated(target);
-      }
-    }
-
-    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
-    LogicalPlanner.verifyProjectedFields(block, node);
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                   TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
-    Context childContext = new Context(plan, upperContext.requiredSet);
-    stack.push(node);
-    LogicalNode child = super.visitTableSubQuery(childContext, plan, block, node, stack);
-    node.setSubQuery(child);
-    stack.pop();
-
-    Target [] targets;
-    if (node.hasTargets()) {
-      targets = node.getTargets();
-    } else {
-      targets = PlannerUtil.schemaToTargets(node.getOutSchema());
-    }
-
-    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
-    for (Iterator<Target> it = getFilteredTarget(targets, upperContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-      upperContext.addExpr(target);
-    }
-
-    for (Iterator<Target> it = upperContext.targetListMgr.getFilteredTargets(upperContext.requiredSet); it.hasNext();) {
-      Target target = it.next();
-
-      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
-        projectedTargets.add(target);
-        upperContext.targetListMgr.markAsEvaluated(target);
-      }
-    }
-
-    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
-    LogicalPlanner.verifyProjectedFields(block, node);
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
-                            Stack<LogicalNode> stack) throws PlanningException {
-    stack.push(node);
-    visit(context, plan, block, node.getChild(), stack);
-    stack.pop();
-    return node;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
deleted file mode 100644
index cb66582..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/QueryRewriteEngine.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-
-public interface QueryRewriteEngine {
-  /**
-   * Rewrite a logical plan with all query rewrite rules added to this engine.
-   *
-   * @param plan The plan to be rewritten with all query rewrite rule.
-   * @return The rewritten plan.
-   */
-  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
deleted file mode 100644
index 89854df..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/RewriteRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-
-/**
- * An interface for a rewrite rule.
- */
-public interface RewriteRule {
-
-  /**
-   * It returns the rewrite rule name. It will be used for debugging and
-   * building a optimization history.
-   *
-   * @return The rewrite rule name
-   */
-  String getName();
-
-  /**
-   * This method checks if this rewrite rule can be applied to a given query plan.
-   * For example, the selection push down can not be applied to the query plan without any filter.
-   * In such case, it will return false.
-   *
-   * @param plan The plan to be checked
-   * @return True if this rule can be applied to a given plan. Otherwise, false.
-   */
-  boolean isEligible(LogicalPlan plan);
-
-  /**
-   * Updates a logical plan and returns an updated logical plan rewritten by this rule.
-   * It must be guaranteed that the input logical plan is not modified even after rewrite.
-   * In other words, the rewrite has to modify an plan copied from the input plan.
-   *
-   * @param plan Input logical plan. It will not be modified.
-   * @return The rewritten logical plan.
-   */
-  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index f4160e4..86fd355 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -24,8 +24,8 @@ import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.logical.NodeType;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 
@@ -132,9 +132,9 @@ public class QueryContext extends OverridableConf {
     put(QueryVars.OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
   }
 
-  public PartitionMethodDesc getPartitionMethod() {
-    return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS));
-  }
+//  public PartitionMethodDesc getPartitionMethod() {
+//    return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS));
+//  }
 
   public void setOutputOverwrite() {
     setBool(QueryVars.OUTPUT_OVERWRITE, true);
@@ -160,8 +160,8 @@ public class QueryContext extends OverridableConf {
     }
   }
 
-  public boolean isCommandType(NodeType commandType) {
-    return equalKey(QueryVars.COMMAND_TYPE, commandType.name());
+  public boolean isCommandType(String commandType) {
+    return equalKey(QueryVars.COMMAND_TYPE, commandType);
   }
 
   public void setCommandType(NodeType nodeType) {
@@ -178,7 +178,7 @@ public class QueryContext extends OverridableConf {
   }
 
   public boolean isCreateTable() {
-    return isCommandType(NodeType.CREATE_TABLE);
+    return isCommandType(NodeType.CREATE_TABLE.name());
   }
 
   public void setInsert() {
@@ -186,6 +186,6 @@ public class QueryContext extends OverridableConf {
   }
 
   public boolean isInsert() {
-    return isCommandType(NodeType.INSERT);
+    return isCommandType(NodeType.INSERT.name());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
deleted file mode 100644
index 981b572..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.utils;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-
-public class SchemaUtil {
-  // See TAJO-914 bug.
-  //
-  // Its essential problem is that constant value is evaluated multiple times at each scan.
-  // As a result, join nodes can take the child nodes which have the same named fields.
-  // Because current schema does not allow the same name and ignore the duplicated schema,
-  // it finally causes the in-out schema mismatch between the parent and child nodes.
-  //
-  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
-  // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
-  static int tmpColumnSeq = 0;
-  public static Schema merge(Schema left, Schema right) {
-    Schema merged = new Schema();
-    for(Column col : left.getColumns()) {
-      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn(col);
-      }
-    }
-    for(Column col : right.getColumns()) {
-      if (merged.containsByQualifiedName(col.getQualifiedName())) {
-        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
-      } else {
-        merged.addColumn(col);
-      }
-    }
-
-    // if overflow
-    if (tmpColumnSeq < 0) {
-      tmpColumnSeq = 0;
-    }
-    return merged;
-  }
-
-  /**
-   * Get common columns to be used as join keys of natural joins.
-   */
-  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
-    Schema common = new Schema();
-    for (Column outer : left.getColumns()) {
-      if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
-        common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
-      }
-    }
-    
-    return common;
-  }
-
-  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
-    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
-    if (tableName != null) {
-      logicalSchema.setQualifier(tableName);
-    }
-    return logicalSchema;
-  }
-
-  public static <T extends Schema> T clone(Schema schema) {
-    try {
-      T copy = (T) schema.clone();
-      return copy;
-    } catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 0752e11..aeb4e05 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -31,7 +31,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.Tuple;
@@ -215,68 +215,4 @@ public class TupleUtil {
       return results;
     }
   }
-
-  /**
-   * Take a look at a column partition path. A partition path consists
-   * of a table path part and column values part. This method transforms
-   * a partition path into a tuple with a given partition column schema.
-   *
-   * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
-   *                   ^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^^^^^
-   *                      table path part        column values part
-   *
-   * When a file path is given, it can perform two ways depending on beNullIfFile flag.
-   * If it is true, it returns NULL when a given path is a file.
-   * Otherwise, it returns a built tuple regardless of file or directory.
-   *
-   * @param partitionColumnSchema The partition column schema
-   * @param partitionPath The partition path
-   * @param beNullIfFile If true, this method returns NULL when a given path is a file.
-   * @return The tuple transformed from a column values part.
-   */
-  public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
-                                                  boolean beNullIfFile) {
-    int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
-
-    if (startIdx == -1) { // if there is no partition column in the patch
-      return null;
-    }
-    String columnValuesPart = partitionPath.toString().substring(startIdx);
-
-    String [] columnValues = columnValuesPart.split("/");
-
-    // true means this is a file.
-    if (beNullIfFile && partitionColumnSchema.size() < columnValues.length) {
-      return null;
-    }
-
-    Tuple tuple = new VTuple(partitionColumnSchema.size());
-    int i = 0;
-    for (; i < columnValues.length && i < partitionColumnSchema.size(); i++) {
-      String [] parts = columnValues[i].split("=");
-      if (parts.length != 2) {
-        return null;
-      }
-      int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
-      Column keyColumn = partitionColumnSchema.getColumn(columnId);
-      tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName(parts[1])));
-    }
-    for (; i < partitionColumnSchema.size(); i++) {
-      tuple.put(i, NullDatum.get());
-    }
-    return tuple;
-  }
-
-  /**
-   * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
-   * Then, you will get a string 'col1='.
-   *
-   * @param partitionColumn the schema of column partition
-   * @return The first part string of column partition path.
-   */
-  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
-    StringBuilder sb = new StringBuilder();
-    sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
-    return sb.toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 333df11..9787276 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.utils.test;
 
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.rewrite.RewriteRule;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.rewrite.RewriteRule;
 
 public class ErrorInjectionRewriter implements RewriteRule {
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 382b789..cd3be98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
@@ -37,20 +40,13 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.exception.IllegalQueryStatusException;
-import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
@@ -60,6 +56,14 @@ import org.apache.tajo.master.querymaster.QueryInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
+import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
+import org.apache.tajo.plan.verifier.VerificationState;
+import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -71,8 +75,8 @@ import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 
 public class GlobalEngine extends AbstractService {
   /** Class Logger */
@@ -267,7 +271,7 @@ public class GlobalEngine extends AbstractService {
 
       // NonFromQuery indicates a form of 'select a, x+y;'
     } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
-      Target [] targets = plan.getRootBlock().getRawTargets();
+      Target[] targets = plan.getRootBlock().getRawTargets();
       if (targets == null) {
         throw new PlanningException("No targets");
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index c0bd842..768528d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -26,11 +26,10 @@ import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.StorageManager;
@@ -77,7 +76,7 @@ public class NonForwardQueryResultScanner {
   }
 
   private void initSeqScanExec() throws IOException {
-    FragmentProto[] fragments = PlannerUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
+    FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
         currentFileIndex, MAX_FILE_NUM_PER_SCAN);
     if (fragments != null && fragments.length > 0) {
       this.taskContext = new TaskAttemptContext(

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index b8240b8..1f445ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -43,9 +43,9 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.CreateTableNode;
-import org.apache.tajo.engine.planner.logical.InsertNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.StorageManager;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 877a20a..536778a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index e4f47cd..bcca039 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index cb06df9..8ba9600 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -36,14 +36,14 @@ import org.apache.tajo.algebra.JsonHelper;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoMasterProtocol;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 8a3ef74..fe2752f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -33,13 +33,13 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.FragmentPair;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.Pair;