You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:41 UTC

[38/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
new file mode 100644
index 0000000..17b5d0a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.Stack;
+
+public interface LogicalPlanVisitor<CONTEXT, RESULT> {
+  RESULT visitRoot(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node,
+                   Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitProjection(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ProjectionNode node,
+                         Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitLimit(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node,
+                    Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitSort(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node,
+                   Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitHaving(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
+                      Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
+                      Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
+                     Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitJoin(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                   Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitUnion(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
+                    Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitExcept(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ExceptNode node,
+                     Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitIntersect(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IntersectNode node,
+                        Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitTableSubQuery(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, TableSubQueryNode node,
+                            Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                   Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   PartitionedTableScanNode node, Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node,
+                         Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitInsert(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
+                     Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitCreateDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateDatabaseNode node,
+                          Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitDropDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropDatabaseNode node,
+                             Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitCreateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node,
+                          Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitDropTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropTableNode node,
+                        Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitAlterTablespace(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, AlterTablespaceNode node,
+                          Stack<LogicalNode> stack) throws PlanningException;
+
+  RESULT visitAlterTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, AlterTableNode node,
+                         Stack<LogicalNode> stack) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
new file mode 100644
index 0000000..f2ddf13
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -0,0 +1,1579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.VerifyException;
+import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+import static org.apache.tajo.algebra.CreateTable.PartitionType;
+import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
+import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
+import static org.apache.tajo.engine.planner.LogicalPlanPreprocessor.PreprocessContext;
+
+/**
+ * This class creates a logical plan from a nested tajo algebra expression ({@link org.apache.tajo.algebra})
+ */
+public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContext, LogicalNode> {
+  private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
+  private final CatalogService catalog;
+  private final LogicalPlanPreprocessor preprocessor;
+  private final ExprAnnotator exprAnnotator;
+  private final ExprNormalizer normalizer;
+
+  public LogicalPlanner(CatalogService catalog) {
+    this.catalog = catalog;
+    this.exprAnnotator = new ExprAnnotator(catalog);
+    this.preprocessor = new LogicalPlanPreprocessor(catalog, exprAnnotator);
+    this.normalizer = new ExprNormalizer();
+  }
+
+  public static class PlanContext {
+    Session session;
+    LogicalPlan plan;
+
+    // transient data for each query block
+    QueryBlock queryBlock;
+
+    boolean debugOrUnitTests;
+
+    public PlanContext(Session session, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) {
+      this.session = session;
+      this.plan = plan;
+      this.queryBlock = block;
+      this.debugOrUnitTests = debugOrUnitTests;
+    }
+
+    public PlanContext(PlanContext context, QueryBlock block) {
+      this.session = context.session;
+      this.plan = context.plan;
+      this.queryBlock = block;
+      this.debugOrUnitTests = context.debugOrUnitTests;
+    }
+
+    public String toString() {
+      return "block=" + queryBlock.getName() + ", relNum=" + queryBlock.getRelations().size() + ", "+
+          queryBlock.namedExprsMgr.toString();
+    }
+  }
+
+  /**
+   * This generates a logical plan.
+   *
+   * @param expr A relational algebraic expression for a query.
+   * @return A logical plan
+   */
+  public LogicalPlan createPlan(Session session, Expr expr) throws PlanningException {
+    return createPlan(session, expr, false);
+  }
+
+  @VisibleForTesting
+  public LogicalPlan createPlan(Session session, Expr expr, boolean debug) throws PlanningException {
+
+    LogicalPlan plan = new LogicalPlan(session.getCurrentDatabase(), this);
+
+    QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
+    PreprocessContext preProcessorCtx = new PreprocessContext(session, plan, rootBlock);
+    preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr);
+
+    PlanContext context = new PlanContext(session, plan, plan.getRootBlock(), debug);
+    LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
+
+    // Add Root Node
+    LogicalRootNode root = plan.createNode(LogicalRootNode.class);
+    root.setInSchema(topMostNode.getOutSchema());
+    root.setChild(topMostNode);
+    root.setOutSchema(topMostNode.getOutSchema());
+    plan.getRootBlock().setRoot(root);
+
+    return plan;
+  }
+
+  public ExprAnnotator getExprAnnotator() {
+    return this.exprAnnotator;
+  }
+
+  public void preHook(PlanContext context, Stack<Expr> stack, Expr expr) throws PlanningException {
+    context.queryBlock.updateCurrentNode(expr);
+  }
+
+  public LogicalNode postHook(PlanContext context, Stack<Expr> stack, Expr expr, LogicalNode current)
+      throws PlanningException {
+
+
+    // Some generated logical nodes (e.g., implicit aggregation) without exprs will pass NULL as a expr parameter.
+    // We should skip them.
+    if (expr != null) {
+      // A relation list including a single ScanNode will return a ScanNode instance that already passed postHook.
+      // So, it skips the already-visited ScanNode instance.
+      if (expr.getType() == OpType.RelationList && current.getType() == NodeType.SCAN) {
+        return current;
+      }
+    }
+
+    QueryBlock queryBlock = context.queryBlock;
+    queryBlock.updateLatestNode(current);
+
+    // if this node is the topmost
+    if (stack.size() == 0) {
+      queryBlock.setRoot(current);
+    }
+
+    if (!stack.empty()) {
+      queryBlock.updateCurrentNode(stack.peek());
+    }
+    return current;
+  }
+
+  public LogicalNode visitExplain(PlanContext ctx, Stack<Expr> stack, Explain expr) throws PlanningException {
+    ctx.plan.setExplain();
+    return visit(ctx, stack, expr.getChild());
+  }
+
+  /*===============================================================================================
+    Data Manupulation Language (DML) SECTION
+   ===============================================================================================*/
+
+
+  /*===============================================================================================
+    PROJECTION SECTION
+   ===============================================================================================*/
+  @Override
+  public LogicalNode visitProjection(PlanContext context, Stack<Expr> stack, Projection projection)
+      throws PlanningException {
+
+
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    // If a non-from statement is given
+    if (!projection.hasChild()) {
+      return buildPlanForNoneFromStatement(context, stack, projection);
+    }
+
+    String [] referenceNames;
+    // in prephase, insert all target list into NamedExprManagers.
+    // Then it gets reference names, each of which points an expression in target list.
+    referenceNames = doProjectionPrephase(context, projection);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(projection);
+    LogicalNode child = visit(context, stack, projection.getChild());
+
+    // check if it is implicit aggregation. If so, it inserts group-by node to its child.
+    if (block.isAggregationRequired()) {
+      child = insertGroupbyNode(context, child, stack);
+    }
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    ProjectionNode projectionNode;
+    Target [] targets;
+    targets = buildTargets(plan, block, referenceNames);
+
+    // Set ProjectionNode
+    projectionNode = context.queryBlock.getNodeFromExpr(projection);
+    projectionNode.setInSchema(child.getOutSchema());
+    projectionNode.setTargets(targets);
+    projectionNode.setChild(child);
+
+    if (projection.isDistinct() && block.hasNode(NodeType.GROUP_BY)) {
+      throw new VerifyException("Cannot support grouping and distinct at the same time yet");
+    } else {
+      if (projection.isDistinct()) {
+        insertDistinctOperator(context, projectionNode, child, stack);
+      }
+    }
+
+    // It's for debugging and unit tests purpose.
+    // It sets raw targets, all of them are raw expressions instead of references.
+    if (context.debugOrUnitTests) {
+      setRawTargets(context, targets, referenceNames, projection);
+    }
+
+    verifyProjectedFields(block, projectionNode);
+    return projectionNode;
+  }
+
+  private void setRawTargets(PlanContext context, Target[] targets, String[] referenceNames,
+                             Projection projection) throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    // It's for debugging or unit tests.
+    Target [] rawTargets = new Target[projection.getNamedExprs().length];
+    for (int i = 0; i < projection.getNamedExprs().length; i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+      EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+      rawTargets[i] = new Target(evalNode, referenceNames[i]);
+    }
+    // it's for debugging or unit testing
+    block.setRawTargets(rawTargets);
+  }
+
+  private void insertDistinctOperator(PlanContext context, ProjectionNode projectionNode, LogicalNode child,
+                                      Stack<Expr> stack) throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    Schema outSchema = projectionNode.getOutSchema();
+    GroupbyNode dupRemoval = context.plan.createNode(GroupbyNode.class);
+    dupRemoval.setChild(child);
+    dupRemoval.setInSchema(projectionNode.getInSchema());
+    dupRemoval.setTargets(PlannerUtil.schemaToTargets(outSchema));
+    dupRemoval.setGroupingColumns(outSchema.toArray());
+
+    block.registerNode(dupRemoval);
+    postHook(context, stack, null, dupRemoval);
+
+    projectionNode.setChild(dupRemoval);
+    projectionNode.setInSchema(dupRemoval.getOutSchema());
+  }
+
+  private String [] doProjectionPrephase(PlanContext context, Projection projection) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    int finalTargetNum = projection.size();
+    String [] referenceNames = new String[finalTargetNum];
+    ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[finalTargetNum];
+    NamedExpr namedExpr;
+    for (int i = 0; i < finalTargetNum; i++) {
+      namedExpr = projection.getNamedExprs()[i];
+
+      if (PlannerUtil.existsAggregationFunction(namedExpr)) {
+        block.setAggregationRequire();
+      }
+      // dissect an expression into multiple parts (at most dissected into three parts)
+      normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
+    }
+
+    // Note: Why separate normalization and add(Named)Expr?
+    //
+    // ExprNormalizer internally makes use of the named exprs in NamedExprsManager.
+    // If we don't separate normalization work and addExprWithName, addExprWithName will find named exprs evaluated
+    // the same logical node. It will cause impossible evaluation in physical executors.
+    for (int i = 0; i < finalTargetNum; i++) {
+      namedExpr = projection.getNamedExprs()[i];
+      // Get all projecting references
+      if (namedExpr.hasAlias()) {
+        NamedExpr aliasedExpr = new NamedExpr(normalizedExprList[i].baseExpr, namedExpr.getAlias());
+        referenceNames[i] = block.namedExprsMgr.addNamedExpr(aliasedExpr);
+      } else {
+        referenceNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
+      }
+
+      // Add sub-expressions (i.e., aggregation part and scalar part) from dissected parts.
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
+    }
+
+    return referenceNames;
+  }
+
+  /**
+   * It builds non-from statement (only expressions) like '<code>SELECT 1+3 as plus</code>'.
+   */
+  private EvalExprNode buildPlanForNoneFromStatement(PlanContext context, Stack<Expr> stack, Projection projection)
+      throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    int finalTargetNum = projection.getNamedExprs().length;
+    Target [] targets = new Target[finalTargetNum];
+
+    for (int i = 0; i < targets.length; i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+      EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+      if (namedExpr.hasAlias()) {
+        targets[i] = new Target(evalNode, namedExpr.getAlias());
+      } else {
+        targets[i] = new Target(evalNode, context.plan.generateUniqueColumnName(namedExpr.getExpr()));
+      }
+    }
+    EvalExprNode evalExprNode = context.queryBlock.getNodeFromExpr(projection);
+    evalExprNode.setTargets(targets);
+    evalExprNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    // it's for debugging or unit testing
+    block.setRawTargets(targets);
+    return evalExprNode;
+  }
+
+  private Target [] buildTargets(LogicalPlan plan, QueryBlock block, String[] referenceNames)
+      throws PlanningException {
+    Target [] targets = new Target[referenceNames.length];
+
+    for (int i = 0; i < referenceNames.length; i++) {
+      if (block.namedExprsMgr.isEvaluated(referenceNames[i])) {
+        targets[i] = block.namedExprsMgr.getTarget(referenceNames[i]);
+      } else {
+        NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referenceNames[i]);
+        EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        block.namedExprsMgr.markAsEvaluated(referenceNames[i], evalNode);
+        targets[i] = new Target(evalNode, referenceNames[i]);
+      }
+    }
+    return targets;
+  }
+
+  public static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
+    if (projectable instanceof ProjectionNode && block.hasNode(NodeType.GROUP_BY)) {
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!projectable.getInSchema().contains(c)) {
+            throw new PlanningException(c.getQualifiedName()
+                + " must appear in the GROUP BY clause or be used in an aggregate function at node ("
+                + projectable.getPID() + ")" );
+          }
+        }
+      }
+    } else  if (projectable instanceof GroupbyNode) {
+      GroupbyNode groupbyNode = (GroupbyNode) projectable;
+      // It checks if all column references within each target can be evaluated with the input schema.
+      int groupingColumnNum = groupbyNode.getGroupingColumns().length;
+      for (int i = 0; i < groupingColumnNum; i++) {
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(groupbyNode.getTargets()[i].getEvalTree());
+        if (!projectable.getInSchema().containsAll(columns)) {
+          throw new PlanningException(String.format("Cannot get the field(s) \"%s\" at node (%d)",
+              TUtil.collectionToString(columns), projectable.getPID()));
+        }
+      }
+      if (groupbyNode.hasAggFunctions()) {
+        for (AggregationFunctionCallEval f : groupbyNode.getAggFunctions()) {
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(f);
+          for (Column c : columns) {
+            if (!projectable.getInSchema().contains(c)) {
+              throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
+                  c, projectable.getPID()));
+            }
+          }
+        }
+      }
+    } else if (projectable instanceof RelationNode) {
+      RelationNode relationNode = (RelationNode) projectable;
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!relationNode.getTableSchema().contains(c)) {
+            throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
+                c, projectable.getPID()));
+          }
+        }
+      }
+    } else {
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!projectable.getInSchema().contains(c)) {
+            throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
+                c, projectable.getPID()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Insert a group-by operator before a sort or a projection operator.
+   * It is used only when a group-by clause is not given.
+   */
+  private LogicalNode insertGroupbyNode(PlanContext context, LogicalNode child, Stack<Expr> stack)
+      throws PlanningException {
+
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+    GroupbyNode groupbyNode = context.plan.createNode(GroupbyNode.class);
+    groupbyNode.setChild(child);
+    groupbyNode.setInSchema(child.getOutSchema());
+
+    groupbyNode.setGroupingColumns(new Column[] {});
+
+    Set<String> aggEvalNames = new LinkedHashSet<String>();
+    Set<AggregationFunctionCallEval> aggEvals = new LinkedHashSet<AggregationFunctionCallEval>();
+    boolean includeDistinctFunction = false;
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
+      NamedExpr rawTarget = it.next();
+      try {
+        includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+          aggEvalNames.add(rawTarget.getAlias());
+          aggEvals.add((AggregationFunctionCallEval) evalNode);
+          block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+
+    groupbyNode.setDistinct(includeDistinctFunction);
+    groupbyNode.setAggFunctions(aggEvals.toArray(new AggregationFunctionCallEval[aggEvals.size()]));
+    Target [] targets = ProjectionPushDownRule.buildGroupByTarget(groupbyNode, null,
+        aggEvalNames.toArray(new String[aggEvalNames.size()]));
+    groupbyNode.setTargets(targets);
+
+    // this inserted group-by node doesn't pass through preprocessor. So manually added.
+    block.registerNode(groupbyNode);
+    postHook(context, stack, null, groupbyNode);
+    return groupbyNode;
+  }
+
+  /*===============================================================================================
+    SORT SECTION
+  ===============================================================================================*/
+  @Override
+  public LimitNode visitLimit(PlanContext context, Stack<Expr> stack, Limit limit) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    EvalNode firstFetNum;
+    LogicalNode child;
+    if (limit.getFetchFirstNum().getType() == OpType.Literal) {
+      firstFetNum = exprAnnotator.createEvalNode(context.plan, block, limit.getFetchFirstNum());
+
+      ////////////////////////////////////////////////////////
+      // Visit and Build Child Plan
+      ////////////////////////////////////////////////////////
+      stack.push(limit);
+      child = visit(context, stack, limit.getChild());
+      stack.pop();
+      ////////////////////////////////////////////////////////
+    } else {
+      ExprNormalizedResult normalizedResult = normalizer.normalize(context, limit.getFetchFirstNum());
+      String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
+
+      ////////////////////////////////////////////////////////
+      // Visit and Build Child Plan
+      ////////////////////////////////////////////////////////
+      stack.push(limit);
+      child = visit(context, stack, limit.getChild());
+      stack.pop();
+      ////////////////////////////////////////////////////////
+
+      if (block.namedExprsMgr.isEvaluated(referName)) {
+        firstFetNum = block.namedExprsMgr.getTarget(referName).getEvalTree();
+      } else {
+        NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
+        firstFetNum = exprAnnotator.createEvalNode(context.plan, block, namedExpr.getExpr());
+        block.namedExprsMgr.markAsEvaluated(referName, firstFetNum);
+      }
+    }
+    LimitNode limitNode = block.getNodeFromExpr(limit);
+    limitNode.setChild(child);
+    limitNode.setInSchema(child.getOutSchema());
+    limitNode.setOutSchema(child.getOutSchema());
+
+    limitNode.setFetchFirst(firstFetNum.eval(null, null).asInt8());
+
+    return limitNode;
+  }
+
+  @Override
+  public SortNode visitSort(PlanContext context, Stack<Expr> stack, Sort sort) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    int sortKeyNum = sort.getSortSpecs().length;
+    Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
+    String [] referNames = new String[sortKeyNum];
+
+    ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[sortKeyNum];
+    for (int i = 0; i < sortKeyNum; i++) {
+      normalizedExprList[i] = normalizer.normalize(context, sortSpecs[i].getKey());
+    }
+    for (int i = 0; i < sortKeyNum; i++) {
+      referNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(sort);
+    LogicalNode child = visit(context, stack, sort.getChild());
+    if (block.isAggregationRequired()) {
+      child = insertGroupbyNode(context, child, stack);
+    }
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    SortNode sortNode = block.getNodeFromExpr(sort);
+    sortNode.setChild(child);
+    sortNode.setInSchema(child.getOutSchema());
+    sortNode.setOutSchema(child.getOutSchema());
+
+
+    // Building sort keys
+    Column column;
+    SortSpec [] annotatedSortSpecs = new SortSpec[sortKeyNum];
+    for (int i = 0; i < sortKeyNum; i++) {
+      if (block.namedExprsMgr.isEvaluated(referNames[i])) {
+        column = block.namedExprsMgr.getTarget(referNames[i]).getNamedColumn();
+      } else {
+        throw new IllegalStateException("Unexpected State: " + TUtil.arrayToString(sortSpecs));
+      }
+      annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst());
+    }
+
+    sortNode.setSortSpecs(annotatedSortSpecs);
+    return sortNode;
+  }
+
+  /*===============================================================================================
+    GROUP BY SECTION
+   ===============================================================================================*/
+
+  @Override
+  public LogicalNode visitHaving(PlanContext context, Stack<Expr> stack, Having expr) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ExprNormalizedResult normalizedResult = normalizer.normalize(context, expr.getQual());
+    String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+    block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
+    block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(expr);
+    LogicalNode child = visit(context, stack, expr.getChild());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    HavingNode having = new HavingNode(context.plan.newPID());
+    having.setChild(child);
+    having.setInSchema(child.getOutSchema());
+    having.setOutSchema(child.getOutSchema());
+
+    EvalNode havingCondition;
+    if (block.namedExprsMgr.isEvaluated(referName)) {
+      havingCondition = block.namedExprsMgr.getTarget(referName).getEvalTree();
+    } else {
+      NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
+      havingCondition = exprAnnotator.createEvalNode(context.plan, block, namedExpr.getExpr());
+      block.namedExprsMgr.markAsEvaluated(referName, havingCondition);
+    }
+
+    // set having condition
+    having.setQual(havingCondition);
+
+    return having;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(PlanContext context, Stack<Expr> stack, Aggregation aggregation)
+      throws PlanningException {
+
+    // Initialization Phase:
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    // Normalize grouping keys and add normalized grouping keys to NamedExprManager
+    int groupingKeyNum = aggregation.getGroupSet()[0].getGroupingSets().length;
+    ExprNormalizedResult [] normalizedResults = new ExprNormalizedResult[groupingKeyNum];
+    for (int i = 0; i < groupingKeyNum; i++) {
+      Expr groupingKey = aggregation.getGroupSet()[0].getGroupingSets()[i];
+      normalizedResults[i] = normalizer.normalize(context, groupingKey);
+    }
+
+    String [] groupingKeyRefNames = new String[groupingKeyNum];
+    for (int i = 0; i < groupingKeyNum; i++) {
+      groupingKeyRefNames[i] = block.namedExprsMgr.addExpr(normalizedResults[i].baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedResults[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedResults[i].scalarExprs);
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(aggregation);
+    LogicalNode child = visit(context, stack, aggregation.getChild());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+    GroupbyNode groupingNode = context.queryBlock.getNodeFromExpr(aggregation);
+    groupingNode.setChild(child);
+    groupingNode.setInSchema(child.getOutSchema());
+
+    // Set grouping sets
+    Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
+    for (int i = 0; i < groupingColumns.length; i++) {
+      if (block.namedExprsMgr.isEvaluated(groupingKeyRefNames[i])) {
+        groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
+      } else {
+        throw new PlanningException("Each grouping column expression must be a scalar expression.");
+      }
+    }
+    groupingNode.setGroupingColumns(groupingColumns);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+
+    // create EvalNodes and check if each EvalNode can be evaluated here.
+    List<String> aggEvalNames = TUtil.newList();
+    List<AggregationFunctionCallEval> aggEvalNodes = TUtil.newList();
+    boolean includeDistinctFunction = false;
+    for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
+      NamedExpr namedExpr = iterator.next();
+      try {
+        includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(namedExpr.getExpr());
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, namedExpr.getExpr());
+        if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+          block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
+          aggEvalNames.add(namedExpr.getAlias());
+          aggEvalNodes.add((AggregationFunctionCallEval) evalNode);
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+    // if there is at least one distinct aggregation function
+    groupingNode.setDistinct(includeDistinctFunction);
+    groupingNode.setAggFunctions(aggEvalNodes.toArray(new AggregationFunctionCallEval[aggEvalNodes.size()]));
+
+    Target [] targets = new Target[groupingKeyNum + aggEvalNames.size()];
+
+    // In target, grouping columns will be followed by aggregation evals.
+    //
+    // col1, col2, col3,   sum(..),  agv(..)
+    // ^^^^^^^^^^^^^^^    ^^^^^^^^^^^^^^^^^^
+    //  grouping keys      aggregation evals
+
+    // Build grouping keys
+    for (int i = 0; i < groupingKeyNum; i++) {
+      Target target = block.namedExprsMgr.getTarget(groupingNode.getGroupingColumns()[i].getQualifiedName());
+      targets[i] = target;
+    }
+
+    for (int i = 0, targetIdx = groupingKeyNum; i < aggEvalNodes.size(); i++, targetIdx++) {
+      targets[targetIdx] = block.namedExprsMgr.getTarget(aggEvalNames.get(i));
+    }
+
+    groupingNode.setTargets(targets);
+    block.unsetAggregationRequire();
+
+    verifyProjectedFields(block, groupingNode);
+    return groupingNode;
+  }
+
+  private static final Column[] ALL= Lists.newArrayList().toArray(new Column[0]);
+
+  public static List<Column[]> generateCuboids(Column[] columns) {
+    int numCuboids = (int) Math.pow(2, columns.length);
+    int maxBits = columns.length;
+
+    List<Column[]> cube = Lists.newArrayList();
+    List<Column> cuboidCols;
+
+    cube.add(ALL);
+    for (int cuboidId = 1; cuboidId < numCuboids; cuboidId++) {
+      cuboidCols = Lists.newArrayList();
+      for (int j = 0; j < maxBits; j++) {
+        int bit = 1 << j;
+        if ((cuboidId & bit) == bit) {
+          cuboidCols.add(columns[j]);
+        }
+      }
+      cube.add(cuboidCols.toArray(new Column[cuboidCols.size()]));
+    }
+    return cube;
+  }
+
+  @Override
+  public SelectionNode visitFilter(PlanContext context, Stack<Expr> stack, Selection selection)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ExprNormalizedResult normalizedResult = normalizer.normalize(context, selection.getQual());
+    block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+    if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
+      throw new VerifyException("Filter condition cannot include aggregation function");
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(selection);
+    LogicalNode child = visit(context, stack, selection.getChild());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    SelectionNode selectionNode = context.queryBlock.getNodeFromExpr(selection);
+    selectionNode.setChild(child);
+    selectionNode.setInSchema(child.getOutSchema());
+    selectionNode.setOutSchema(child.getOutSchema());
+
+    // Create EvalNode for a search condition.
+    EvalNode searchCondition = exprAnnotator.createEvalNode(context.plan, block, selection.getQual());
+    EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+    // set selection condition
+    selectionNode.setQual(simplified);
+
+    return selectionNode;
+  }
+
+  /*===============================================================================================
+    JOIN SECTION
+   ===============================================================================================*/
+
+  @Override
+  public LogicalNode visitJoin(PlanContext context, Stack<Expr> stack, Join join)
+      throws PlanningException {
+    // Phase 1: Init
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    if (join.hasQual()) {
+      ExprNormalizedResult normalizedResult = normalizer.normalize(context, join.getQual());
+      block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+      if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
+        throw new VerifyException("Filter condition cannot include aggregation function");
+      }
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(join);
+    LogicalNode left = visit(context, stack, join.getLeft());
+    LogicalNode right = visit(context, stack, join.getRight());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    JoinNode joinNode = context.queryBlock.getNodeFromExpr(join);
+    joinNode.setJoinType(join.getJoinType());
+    joinNode.setLeftChild(left);
+    joinNode.setRightChild(right);
+
+    // Set A merged input schema
+    Schema merged;
+    if (join.isNatural()) {
+      merged = getNaturalJoinSchema(left, right);
+    } else {
+      merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    }
+    joinNode.setInSchema(merged);
+
+    // Create EvalNode for a search condition.
+    EvalNode joinCondition = null;
+    if (join.hasQual()) {
+      EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, block, join.getQual());
+      joinCondition = AlgebraicUtil.eliminateConstantExprs(evalNode);
+    }
+
+    List<String> newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(plan, block, joinNode, stack);
+    List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
+
+    for (String newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+    joinNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    // Determine join conditions
+    if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
+      EvalNode njCond = getNaturalJoinCondition(joinNode);
+      joinNode.setJoinQual(njCond);
+    } else if (join.hasQual()) { // otherwise, the given join conditions are set
+      joinNode.setJoinQual(joinCondition);
+    }
+
+    return joinNode;
+  }
+
+  private List<String> getNewlyEvaluatedExprsForJoin(LogicalPlan plan, QueryBlock block, JoinNode joinNode,
+                                                   Stack<Expr> stack) {
+    EvalNode evalNode;
+    List<String> newlyEvaluatedExprs = TUtil.newList();
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
+      NamedExpr namedExpr = it.next();
+      try {
+        evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, evalNode, joinNode, stack.peek().getType() != OpType.Join)) {
+          block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(namedExpr.getAlias());
+        }
+      } catch (VerifyException ve) {} catch (PlanningException e) {
+        e.printStackTrace();
+      }
+    }
+    return newlyEvaluatedExprs;
+  }
+
+  private static Schema getNaturalJoinSchema(LogicalNode left, LogicalNode right) {
+    Schema joinSchema = new Schema();
+    Schema commons = SchemaUtil.getNaturalJoinColumns(left.getOutSchema(), right.getOutSchema());
+    joinSchema.addColumns(commons);
+    for (Column c : left.getOutSchema().getColumns()) {
+      if (!joinSchema.contains(c.getQualifiedName())) {
+        joinSchema.addColumn(c);
+      }
+    }
+
+    for (Column c : right.getOutSchema().getColumns()) {
+      if (!joinSchema.contains(c.getQualifiedName())) {
+        joinSchema.addColumn(c);
+      }
+    }
+    return joinSchema;
+  }
+
+  private static EvalNode getNaturalJoinCondition(JoinNode joinNode) {
+    Schema leftSchema = joinNode.getLeftChild().getInSchema();
+    Schema rightSchema = joinNode.getRightChild().getInSchema();
+    Schema commons = SchemaUtil.getNaturalJoinColumns(leftSchema, rightSchema);
+
+    EvalNode njQual = null;
+    EvalNode equiQual;
+    Column leftJoinKey;
+    Column rightJoinKey;
+
+    for (Column common : commons.getColumns()) {
+      leftJoinKey = leftSchema.getColumn(common.getQualifiedName());
+      rightJoinKey = rightSchema.getColumn(common.getQualifiedName());
+      equiQual = new BinaryEval(EvalType.EQUAL,
+          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
+      if (njQual == null) {
+        njQual = equiQual;
+      } else {
+        njQual = new BinaryEval(EvalType.AND, njQual, equiQual);
+      }
+    }
+
+    return njQual;
+  }
+
+  private LogicalNode createCartesianProduct(PlanContext context, LogicalNode left, LogicalNode right)
+      throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    JoinNode join = plan.createNode(JoinNode.class);
+    join.init(JoinType.CROSS, left, right);
+    join.setInSchema(merged);
+
+    EvalNode evalNode;
+    List<String> newlyEvaluatedExprs = TUtil.newList();
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
+      NamedExpr namedExpr = it.next();
+      try {
+        evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() == 0) {
+          block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(namedExpr.getAlias());
+        }
+      } catch (VerifyException ve) {}
+    }
+
+    List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
+    for (String newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+    join.setTargets(targets.toArray(new Target[targets.size()]));
+    return join;
+  }
+
+  @Override
+  public LogicalNode visitRelationList(PlanContext context, Stack<Expr> stack, RelationList relations)
+      throws PlanningException {
+
+    LogicalNode current = visit(context, stack, relations.getRelations()[0]);
+
+    LogicalNode left;
+    LogicalNode right;
+    if (relations.size() > 1) {
+
+      for (int i = 1; i < relations.size(); i++) {
+        left = current;
+        right = visit(context, stack, relations.getRelations()[i]);
+        current = createCartesianProduct(context, left, right);
+      }
+    }
+    context.queryBlock.registerNode(current);
+
+    return current;
+  }
+
+  @Override
+  public ScanNode visitRelation(PlanContext context, Stack<Expr> stack, Relation expr)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ScanNode scanNode = block.getNodeFromExpr(expr);
+    updatePhysicalInfo(scanNode.getTableDesc());
+
+    // Find expression which can be evaluated at this relation node.
+    // Except for column references, additional expressions used in select list, where clause, order-by clauses
+    // can be evaluated here. Their reference names are kept in newlyEvaluatedExprsRef.
+    Set<String> newlyEvaluatedExprsReferences = new LinkedHashSet<String>();
+    for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
+      NamedExpr rawTarget = iterator.next();
+      try {
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (checkIfBeEvaluatedAtRelation(block, evalNode, scanNode)) {
+          block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
+          newlyEvaluatedExprsReferences.add(rawTarget.getAlias()); // newly added exr
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+
+    // Assume that each unique expr is evaluated once.
+    LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, scanNode, newlyEvaluatedExprsReferences);
+
+    // The fact the some expr is included in newlyEvaluatedExprsReferences means that it is already evaluated.
+    // So, we get a raw expression and then creates a target.
+    for (String reference : newlyEvaluatedExprsReferences) {
+      NamedExpr refrrer = block.namedExprsMgr.getNamedExpr(reference);
+      EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, block, refrrer.getExpr());
+      targets.add(new Target(evalNode, reference));
+    }
+
+    scanNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    verifyProjectedFields(block, scanNode);
+    return scanNode;
+  }
+
+  private static LinkedHashSet<Target> createFieldTargetsFromRelation(QueryBlock block, RelationNode relationNode,
+                                                      Set<String> newlyEvaluatedRefNames) {
+    LinkedHashSet<Target> targets = Sets.newLinkedHashSet();
+    for (Column column : relationNode.getTableSchema().getColumns()) {
+      String aliasName = block.namedExprsMgr.checkAndGetIfAliasedColumn(column.getQualifiedName());
+      if (aliasName != null) {
+        targets.add(new Target(new FieldEval(column), aliasName));
+        newlyEvaluatedRefNames.remove(aliasName);
+      } else {
+        targets.add(new Target(new FieldEval(column)));
+      }
+    }
+    return targets;
+  }
+
+  private void updatePhysicalInfo(TableDesc desc) {
+    if (desc.getPath() != null) {
+      try {
+        FileSystem fs = desc.getPath().getFileSystem(new Configuration());
+        FileStatus status = fs.getFileStatus(desc.getPath());
+        if (desc.getStats() != null && (status.isDirectory() || status.isFile())) {
+          ContentSummary summary = fs.getContentSummary(desc.getPath());
+          if (summary != null) {
+            long volume = summary.getLength();
+            desc.getStats().setNumBytes(volume);
+          }
+        }
+      } catch (Throwable t) {
+        LOG.warn(t);
+      }
+    }
+  }
+
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<Expr> stack, TablePrimarySubQuery expr)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    QueryBlock childBlock = context.plan.getBlock(context.plan.getBlockNameByExpr(expr.getSubQuery()));
+    PlanContext newContext = new PlanContext(context, childBlock);
+    LogicalNode child = visit(newContext, new Stack<Expr>(), expr.getSubQuery());
+    TableSubQueryNode subQueryNode = context.queryBlock.getNodeFromExpr(expr);
+    context.plan.connectBlocks(childBlock, context.queryBlock, BlockType.TableSubQuery);
+    subQueryNode.setSubQuery(child);
+
+    // Add additional expressions required in upper nodes.
+    Set<String> newlyEvaluatedExprs = TUtil.newHashSet();
+    for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+      try {
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (checkIfBeEvaluatedAtRelation(block, evalNode, subQueryNode)) {
+          block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(rawTarget.getAlias()); // newly added exr
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+
+    // Assume that each unique expr is evaluated once.
+    LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, subQueryNode, newlyEvaluatedExprs);
+
+    for (String newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+
+    subQueryNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    return subQueryNode;
+  }
+
+    /*===============================================================================================
+    SET OPERATION SECTION
+   ===============================================================================================*/
+
+  @Override
+  public LogicalNode visitUnion(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
+
+  @Override
+  public LogicalNode visitExcept(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
+
+  @Override
+  public LogicalNode visitIntersect(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
+
+  private LogicalNode buildSetPlan(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+
+    // 1. Init Phase
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Left Child Plan
+    ////////////////////////////////////////////////////////
+    QueryBlock leftBlock = context.plan.getBlockByExpr(setOperation.getLeft());
+    PlanContext leftContext = new PlanContext(context, leftBlock);
+    stack.push(setOperation);
+    LogicalNode leftChild = visit(leftContext, new Stack<Expr>(), setOperation.getLeft());
+    stack.pop();
+    // Connect left child and current blocks
+    context.plan.connectBlocks(leftContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Right Child Plan
+    ////////////////////////////////////////////////////////
+    QueryBlock rightBlock = context.plan.getBlockByExpr(setOperation.getRight());
+    PlanContext rightContext = new PlanContext(context, rightBlock);
+    stack.push(setOperation);
+    LogicalNode rightChild = visit(rightContext, new Stack<Expr>(), setOperation.getRight());
+    stack.pop();
+    // Connect right child and current blocks
+    context.plan.connectBlocks(rightContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
+
+    BinaryNode setOp;
+    if (setOperation.getType() == OpType.Union) {
+      setOp = block.getNodeFromExpr(setOperation);
+    } else if (setOperation.getType() == OpType.Except) {
+      setOp = block.getNodeFromExpr(setOperation);
+    } else if (setOperation.getType() == OpType.Intersect) {
+      setOp = block.getNodeFromExpr(setOperation);
+    } else {
+      throw new VerifyException("Invalid Type: " + setOperation.getType());
+    }
+    setOp.setLeftChild(leftChild);
+    setOp.setRightChild(rightChild);
+
+    // An union statement can be derived from two query blocks.
+    // For one union statement between both relations, we can ensure that each corresponding data domain of both
+    // relations are the same. However, if necessary, the schema of left query block will be used as a base schema.
+    Target [] leftStrippedTargets = PlannerUtil.stripTarget(
+        PlannerUtil.schemaToTargets(leftBlock.getRoot().getOutSchema()));
+
+    setOp.setInSchema(leftChild.getOutSchema());
+    Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
+    setOp.setOutSchema(outSchema);
+
+    return setOp;
+  }
+
+  /*===============================================================================================
+    INSERT SECTION
+   ===============================================================================================*/
+
+  public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode subQuery = super.visitInsert(context, stack, expr);
+    stack.pop();
+
+    InsertNode insertNode = context.queryBlock.getNodeFromExpr(expr);
+    insertNode.setOverwrite(expr.isOverwrite());
+    insertNode.setSubQuery(subQuery);
+
+    if (expr.hasTableName()) { // INSERT (OVERWRITE) INTO TABLE ...
+      return buildInsertIntoTablePlan(context, insertNode, expr);
+    } else if (expr.hasLocation()) { // INSERT (OVERWRITE) INTO LOCATION ...
+      return buildInsertIntoLocationPlan(context, insertNode, expr);
+    } else {
+      throw new IllegalStateException("Invalid Query");
+    }
+  }
+
+  /**
+   * Builds a InsertNode with a target table.
+   *
+   * ex) INSERT OVERWRITE INTO TABLE ...
+   * <br />
+   *
+   * We use the following terms, such target table, target column
+   * <pre>
+   * INSERT INTO    TB_NAME        (col1, col2)          SELECT    c1,   c2        FROM ...
+   *                ^^^^^^^        ^^^^^^^^^^^^                  ^^^^^^^^^^^^
+   *             target table   target columns (or schema)     projected columns (or schema)
+   * </pre>
+   */
+  private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
+      throws PlanningException {
+    // Get and set a target table
+    TableDesc desc = catalog.getTableDesc(context.session.getCurrentDatabase(), expr.getTableName());
+    insertNode.setTargetTable(desc);
+
+    //
+    // When we use 'INSERT (OVERWIRTE) INTO TABLE statements, there are two cases.
+    //
+    // First, when a user specified target columns
+    // INSERT (OVERWRITE)? INTO table_name (col1 type, col2 type) SELECT ...
+    //
+    // Second, when a user do not specified target columns
+    // INSERT (OVERWRITE)? INTO table_name SELECT ...
+    //
+    // In the former case is, target columns' schema and corresponding projected columns' schema
+    // must be equivalent or be available to cast implicitly.
+    //
+    // In the later case, the target table's schema and projected column's
+    // schema of select clause can be different to each other. In this case,
+    // we use only a sequence of preceding columns of target table's schema
+    // as target columns.
+    //
+    // For example, consider a target table and an 'insert into' query are give as follows:
+    //
+    // CREATE TABLE TB1                  (col1 int,  col2 int, col3 long);
+    //                                      ||          ||
+    // INSERT OVERWRITE INTO TB1 SELECT  order_key,  part_num               FROM ...
+    //
+    // In this example, only col1 and col2 are used as target columns.
+
+    if (expr.hasTargetColumns()) { // when a user specified target columns
+
+      if (expr.getTargetColumns().length > insertNode.getChild().getOutSchema().size()) {
+        throw new PlanningException("Target columns and projected columns are mismatched to each other");
+      }
+
+      // See PreLogicalPlanVerifier.visitInsert.
+      // It guarantees that the equivalence between the numbers of target and projected columns.
+      ScanNode scanNode = context.plan.createNode(ScanNode.class);
+      scanNode.init(desc);
+      context.queryBlock.addRelation(scanNode);
+      String [] targets = expr.getTargetColumns();
+      Schema targetColumns = new Schema();
+      for (int i = 0; i < targets.length; i++) {
+        Column targetColumn = context.plan.resolveColumn(context.queryBlock, new ColumnReferenceExpr(targets[i]));
+        targetColumns.addColumn(targetColumn);
+      }
+      insertNode.setTargetSchema(targetColumns);
+      insertNode.setOutSchema(targetColumns);
+      buildProjectedInsert(insertNode);
+
+    } else { // when a user do not specified target columns
+
+      // The output schema of select clause determines the target columns.
+      Schema tableSchema = desc.getLogicalSchema();
+      Schema projectedSchema = insertNode.getChild().getOutSchema();
+
+      Schema targetColumns = new Schema();
+      for (int i = 0; i < projectedSchema.size(); i++) {
+        targetColumns.addColumn(tableSchema.getColumn(i));
+      }
+      insertNode.setTargetSchema(targetColumns);
+      buildProjectedInsert(insertNode);
+    }
+
+    if (desc.hasPartition()) {
+      insertNode.setPartitionMethod(desc.getPartitionMethod());
+    }
+    return insertNode;
+  }
+
+  private void buildProjectedInsert(InsertNode insertNode) {
+    Schema tableSchema = insertNode.getTableSchema();
+    Schema targetColumns = insertNode.getTargetSchema();
+
+    ProjectionNode projectionNode = insertNode.getChild();
+
+    // Modifying projected columns by adding NULL constants
+    // It is because that table appender does not support target columns to be written.
+    List<Target> targets = TUtil.newList();
+    for (int i = 0, j = 0; i < tableSchema.size(); i++) {
+      Column column = tableSchema.getColumn(i);
+
+      if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
+        targets.add(projectionNode.getTargets()[j++]);
+      } else {
+        targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
+      }
+    }
+    projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    insertNode.setInSchema(projectionNode.getOutSchema());
+    insertNode.setOutSchema(projectionNode.getOutSchema());
+    insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+  }
+
+  /**
+   * Build a InsertNode with a location.
+   *
+   * ex) INSERT OVERWRITE INTO LOCATION 'hdfs://....' ..
+   */
+  private InsertNode buildInsertIntoLocationPlan(PlanContext context, InsertNode insertNode, Insert expr) {
+    // INSERT (OVERWRITE)? INTO LOCATION path (USING file_type (param_clause)?)? query_expression
+
+    Schema childSchema = insertNode.getChild().getOutSchema();
+    insertNode.setInSchema(childSchema);
+    insertNode.setOutSchema(childSchema);
+    insertNode.setTableSchema(childSchema);
+    insertNode.setTargetLocation(new Path(expr.getLocation()));
+
+    if (expr.hasStorageType()) {
+      insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+    }
+    if (expr.hasParams()) {
+      Options options = new Options();
+      options.putAll(expr.getParams());
+      insertNode.setOptions(options);
+    }
+    return insertNode;
+  }
+
+  /*===============================================================================================
+    Data Definition Language (DDL) SECTION
+   ===============================================================================================*/
+
+  @Override
+  public LogicalNode visitCreateDatabase(PlanContext context, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    CreateDatabaseNode createDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
+    createDatabaseNode.init(expr.getDatabaseName(), expr.isIfNotExists());
+    return createDatabaseNode;
+  }
+
+  @Override
+  public LogicalNode visitDropDatabase(PlanContext context, Stack<Expr> stack, DropDatabase expr)
+      throws PlanningException {
+    DropDatabaseNode dropDatabaseNode = context.plan.createNode(DropDatabaseNode.class);
+    dropDatabaseNode.init(expr.getDatabaseName(), expr.isIfExists());
+    return dropDatabaseNode;
+  }
+
+  @Override
+  public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
+      throws PlanningException {
+
+    CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+    createTableNode.setIfNotExists(expr.isIfNotExists());
+
+    // Set a table name to be created.
+    if (CatalogUtil.isFQTableName(expr.getTableName())) {
+      createTableNode.setTableName(expr.getTableName());
+    } else {
+      createTableNode.setTableName(
+          CatalogUtil.buildFQName(context.session.getCurrentDatabase(), expr.getTableName()));
+    }
+
+
+    if (expr.hasStorageType()) { // If storage type (using clause) is specified
+      createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+    } else { // otherwise, default type
+      createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+    }
+
+    // Set default storage properties to be created.
+    Options options = StorageUtil.newPhysicalProperties(createTableNode.getStorageType());
+    if (expr.hasParams()) {
+      options.putAll(expr.getParams());
+    }
+
+    createTableNode.setOptions(options);
+
+    if (expr.hasPartition()) {
+      if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
+        createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
+      } else {
+        throw new PlanningException(String.format("Not supported PartitonType: %s",
+            expr.getPartitionMethod().getPartitionType()));
+      }
+    }
+
+    if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
+      stack.add(expr);
+      LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
+      stack.pop();
+      createTableNode.setChild(subQuery);
+      createTableNode.setInSchema(subQuery.getOutSchema());
+
+      // If the table schema is defined
+      // ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
+      if (expr.hasTableElements()) {
+        createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
+        createTableNode.setTableSchema(convertTableElementsSchema(expr.getTableElements()));
+      } else {
+        // if no table definition, the select clause's output schema will be used.
+        // ex) CREATE TABLE tbl AS SELECT ...
+
+        if (expr.hasPartition()) {
+          PartitionMethodDesc partitionMethod = createTableNode.getPartitionMethod();
+
+          Schema queryOutputSchema = subQuery.getOutSchema();
+          Schema partitionExpressionSchema = partitionMethod.getExpressionSchema();
+          if (partitionMethod.getPartitionType() == CatalogProtos.PartitionType.COLUMN &&
+              queryOutputSchema.size() < partitionExpressionSchema.size()) {
+            throw new VerifyException("Partition columns cannot be more than table columns.");
+          }
+          Schema tableSchema = new Schema();
+          for (int i = 0; i < queryOutputSchema.size() - partitionExpressionSchema.size(); i++) {
+            tableSchema.addColumn(queryOutputSchema.getColumn(i));
+          }
+          createTableNode.setOutSchema(tableSchema);
+          createTableNode.setTableSchema(tableSchema);
+        } else {
+          createTableNode.setOutSchema(subQuery.getOutSchema());
+          createTableNode.setTableSchema(subQuery.getOutSchema());
+        }
+      }
+
+      return createTableNode;
+
+    } else { // if CREATE AN EMPTY TABLE
+      Schema tableSchema = convertColumnsToSchema(expr.getTableElements());
+      createTableNode.setTableSchema(tableSchema);
+
+      if (expr.isExternal()) {
+        createTableNode.setExternal(true);
+      }
+
+      if (expr.hasLocation()) {
+        createTableNode.setPath(new Path(expr.getLocation()));
+      }
+
+      return createTableNode;
+    }
+  }
+
+  private PartitionMethodDesc getPartitionMethod(PlanContext context,
+                                                 String tableName,
+                                                 CreateTable.PartitionMethodDescExpr expr) throws PlanningException {
+    PartitionMethodDesc partitionMethodDesc;
+
+    if(expr.getPartitionType() == PartitionType.COLUMN) {
+      CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
+      String partitionExpression = Joiner.on(',').join(partition.getColumns());
+
+      partitionMethodDesc = new PartitionMethodDesc(context.session.getCurrentDatabase(), tableName,
+          CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns()));
+    } else {
+      throw new PlanningException(String.format("Not supported PartitonType: %s", expr.getPartitionType()));
+    }
+    return partitionMethodDesc;
+  }
+
+  /**
+   * It transforms table definition elements to schema.
+   *
+   * @param elements to be transformed
+   * @return schema transformed from table definition elements
+   */
+  private Schema convertColumnsToSchema(ColumnDefinition[] elements) {
+    Schema schema = new Schema();
+
+    for (ColumnDefinition columnDefinition: elements) {
+      schema.addColumn(convertColumn(columnDefinition));
+    }
+
+    return schema;
+  }
+
+  /**
+   * It transforms table definition elements to schema.
+   *
+   * @param elements to be transformed
+   * @return schema transformed from table definition elements
+   */
+  private Schema convertTableElementsSchema(ColumnDefinition[] elements) {
+    Schema schema = new Schema();
+
+    for (ColumnDefinition columnDefinition: elements) {
+      schema.addColumn(convertColumn(columnDefinition));
+    }
+
+    return schema;
+  }
+
+  private Column convertColumn(ColumnDefinition columnDefinition) {
+    return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
+  }
+
+  static TajoDataTypes.DataType convertDataType(DataTypeExpr dataType) {
+    TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
+
+    TajoDataTypes.DataType.Builder builder = TajoDataTypes.DataType.newBuilder();
+    builder.setType(type);
+    if (dataType.hasLengthOrPrecision()) {
+      builder.setLength(dataType.getLengthOrPrecision());
+    }
+    return builder.build();
+  }
+
+
+  @Override
+  public LogicalNode visitDropTable(PlanContext context, Stack<Expr> stack, DropTable dropTable) {
+    DropTableNode dropTableNode = context.queryBlock.getNodeFromExpr(dropTable);
+    String qualified;
+    if (CatalogUtil.isFQTableName(dropTable.getTableName())) {
+      qualified = dropTable.getTableName();
+    } else {
+      qualified = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), dropTable.getTableName());
+    }
+    dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge());
+    return dropTableNode;
+  }
+
+  public LogicalNode visitAlterTablespace(PlanContext context, Stack<Expr> stack, AlterTablespace alterTablespace) {
+    AlterTablespaceNode alter = context.queryBlock.getNodeFromExpr(alterTablespace);
+    alter.setTablespaceName(alterTablespace.getTablespaceName());
+    alter.setLocation(alterTablespace.getLocation());
+    return alter;
+  }
+
+  @Override
+  public LogicalNode visitAlterTable(PlanContext context, Stack<Expr> stack, AlterTable alterTable) {
+    AlterTableNode alterTableNode = context.queryBlock.getNodeFromExpr(alterTable);
+    alterTableNode.setTableName(alterTable.getTableName());
+    alterTableNode.setNewTableName(alterTable.getNewTableName());
+    alterTableNode.setColumnName(alterTable.getColumnName());
+    alterTableNode.setNewColumnName(alterTable.getNewColumnName());
+
+    if (null != alterTable.getAddNewColumn()) {
+      alterTableNode.setAddNewColumn(convertColumn(alterTable.getAddNewColumn()));
+    }
+    alterTableNode.setAlterTableOpType(alterTable.getAlterTableOpType());
+    return alterTableNode;
+  }
+
+  /*===============================================================================================
+    Util SECTION
+  ===============================================================================================*/
+
+  public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode node) {
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
+
+    if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode node,
+                                                 boolean isTopMostJoin) {
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
+
+    if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+      return false;
+    }
+
+    if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    // When a 'case-when' is used with outer join, the case-when expression must be evaluated
+    // at the topmost join operator.
+    // TODO - It's also valid that case-when is evalauted at the topmost outer operator.
+    //        But, how can we know there is no further outer join operator after this node?
+    if (!checkIfCaseWhenWithOuterJoinBeEvaluated(block, evalNode, isTopMostJoin)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean checkIfCaseWhenWithOuterJoinBeEvaluated(QueryBlock block, EvalNode evalNode,
+                                                                 boolean isTopMostJoin) {
+    if (block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER)) {
+      Collection<CaseWhenEval> caseWhenEvals = EvalTreeUtil.findEvalsByType(evalNode, EvalType.CASE);
+      if (caseWhenEvals.size() > 0 && !isTopMostJoin) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * It checks if evalNode can be evaluated at this @{link RelationNode}.
+   */
+  public static boolean checkIfBeEvaluatedAtRelation(QueryBlock block, EvalNode evalNode, RelationNode node) {
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
+
+    // aggregation functions cannot be evaluated in scan node
+    if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+      return false;
+    }
+
+    if (columnRefs.size() > 0 && !node.getTableSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    // Why? - When a {case when} is used with outer join, case when must be evaluated at topmost outer join.
+    if (block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER)) {
+      Collection<CaseWhenEval> found = EvalTreeUtil.findEvalsByType(evalNode, EvalType.CASE);
+      if (found.size() > 0) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  public static boolean checkIfBeEvaluatedAtThis(EvalNode evalNode, LogicalNode node) {
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
+    if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/NamedExprsManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/NamedExprsManager.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/NamedExprsManager.java
new file mode 100644
index 0000000..2fcf3bc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/NamedExprsManager.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.NamedExpr;
+import org.apache.tajo.algebra.OpType;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalType;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * NamedExprsManager manages an expressions used in a query block. All expressions used in a query block must be
+ * added to NamedExprsManager. When an expression is added to NamedExprsManager, NamedExprsManager gives a reference
+ * to the expression. If the expression already has an alias name, it gives the alias name as the reference
+ * to the expression. If the expression does not have any alias, it gives a generated name as the reference to the
+ * expression. Usually, predicates in WHERE clause, expressions in GROUP-BY, ORDER-BY, LIMIT clauses are not given
+ * any alias name. Those reference names are used to identify an individual expression.
+ *
+ * NamedExprsManager only keeps unique expressions. Since expressions in a query block can be duplicated,
+ * one or more reference names can point one expressions. Due to this process, it naturally removes duplicated
+ * expression.
+ *
+ * As we mentioned above, one or more reference names can indicate one expression. Primary names are used for
+ * representing expressions. A primary name of an expression indicates the reference obtained when
+ * the expression is added firstly. All output schemas uses only primary names of expressions.
+ *
+ * Each expression that NamedExprsManager keeps has an boolean state to indicate whether the expression is evaluated
+ * or not. The <code>evaluated</code> state means that upper logical operators can access this expression like a column
+ * reference. For it, the reference name is used to access this expression like a column reference,
+ * The evaluated state is set with an EvalNode which is an annotated expression.
+ * {@link #getTarget(String)} returns EvalNodes by a reference name.
+ */
+public class NamedExprsManager {
+  /** a sequence id */
+  private int sequenceId = 0;
+
+  /** Map: Name -> ID. Two or more different names can indicates the same id. */
+  private LinkedHashMap<String, Integer> nameToIdMap = Maps.newLinkedHashMap();
+
+  /** Map; ID <-> EvalNode */
+  private BiMap<Integer, EvalNode> idToEvalMap = HashBiMap.create();
+
+  /** Map: ID -> Names */
+  private LinkedHashMap<Integer, List<String>> idToNamesMap = Maps.newLinkedHashMap();
+
+  /** Map: ID -> Expr */
+  private BiMap<Integer, Expr> idToExprBiMap = HashBiMap.create();
+
+  /** Map; Name -> Boolean (if it is resolved or not) */
+  private LinkedHashMap<Integer, Boolean> evaluationStateMap = Maps.newLinkedHashMap();
+
+  /** Map: Alias Name <-> Original Name */
+  private BiMap<String, String> aliasedColumnMap = HashBiMap.create();
+
+  private LogicalPlan plan;
+
+  public NamedExprsManager(LogicalPlan plan) {
+    this.plan = plan;
+  }
+
+  private int getNextId() {
+    return sequenceId++;
+  }
+
+  /**
+   * Check whether the expression corresponding to a given name was evaluated.
+   *
+   * @param name The name of a certain expression to be checked
+   * @return true if resolved. Otherwise, false.
+   */
+  public boolean isEvaluated(String name) {
+    if (nameToIdMap.containsKey(name)) {
+      int refId = nameToIdMap.get(name);
+      return evaluationStateMap.containsKey(refId) && evaluationStateMap.get(refId);
+    } else {
+      return false;
+    }
+  }
+
+  public boolean contains(String name) {
+    return nameToIdMap.containsKey(name);
+  }
+
+  public boolean contains(Expr expr) {
+    return idToExprBiMap.inverse().containsKey(expr);
+  }
+
+  private Expr getExpr(String name) {
+    return idToExprBiMap.get(nameToIdMap.get(name));
+  }
+
+  public NamedExpr getNamedExpr(String name) {
+    String normalized = name;
+    return new NamedExpr(getExpr(name), normalized);
+  }
+
+  public boolean isAliased(String name) {
+    return aliasedColumnMap.containsKey(name);
+  }
+
+  public String getAlias(String originalName) {
+    return aliasedColumnMap.get(originalName);
+  }
+
+  public boolean isAliasedName(String aliasName) {
+    return aliasedColumnMap.inverse().containsKey(aliasName);
+  }
+
+  public String getOriginalName(String aliasName) {
+    return aliasedColumnMap.inverse().get(aliasName);
+  }
+
+  /**
+   * Adds an expression and returns a reference name.
+   */
+  public String addExpr(Expr expr) throws PlanningException {
+    if (idToExprBiMap.inverse().containsKey(expr)) {
+      int refId = idToExprBiMap.inverse().get(expr);
+      return idToNamesMap.get(refId).get(0);
+    }
+
+    String generatedName = plan.generateUniqueColumnName(expr);
+    return addExpr(expr, generatedName);
+  }
+
+  /**
+   * Adds an expression with an alias name and returns a reference name.
+   * It specifies the alias as an reference name.
+   */
+  public String addExpr(Expr expr, String alias) throws PlanningException {
+
+    // if this name already exists, just returns the name.
+    if (nameToIdMap.containsKey(alias)) {
+      return alias;
+    }
+
+    // if the name is first
+    int refId;
+    if (idToExprBiMap.inverse().containsKey(expr)) {
+      refId = idToExprBiMap.inverse().get(expr);
+    } else {
+      refId = getNextId();
+      idToExprBiMap.put(refId, expr);
+    }
+
+    nameToIdMap.put(alias, refId);
+    evaluationStateMap.put(refId, false);
+
+    // add the entry to idToNames map
+    TUtil.putToNestedList(idToNamesMap, refId, alias);
+
+    return alias;
+  }
+
+  /**
+   * Adds an expression and returns a reference name.
+   * If an alias is given, it specifies the alias as an reference name.
+   */
+  public String addNamedExpr(NamedExpr namedExpr) throws PlanningException {
+    if (namedExpr.hasAlias()) {
+      return addExpr(namedExpr.getExpr(), namedExpr.getAlias());
+    } else {
+      return addExpr(namedExpr.getExpr());
+    }
+  }
+
+  /**
+   * Adds a list of expressions and returns a list of reference names.
+   * If some NamedExpr has an alias, NamedExprsManager specifies the alias for the NamedExpr.
+   */
+  public String [] addNamedExprArray(@Nullable Collection<NamedExpr> namedExprs) throws PlanningException {
+    if (namedExprs != null && namedExprs.size() > 0) {
+      String [] names = new String[namedExprs.size()];
+      int i = 0;
+      for (NamedExpr target : namedExprs) {
+        names[i++] = addNamedExpr(target);
+      }
+      return names;
+    } else {
+      return null;
+    }
+  }
+
+  public Collection<NamedExpr> getAllNamedExprs() {
+    List<NamedExpr> namedExprList = Lists.newArrayList();
+    for (Map.Entry<Integer, Expr> entry: idToExprBiMap.entrySet()) {
+      namedExprList.add(new NamedExpr(entry.getValue(), idToNamesMap.get(entry.getKey()).get(0)));
+    }
+    return namedExprList;
+  }
+
+  /**
+   * It marks the expression identified by the reference name as <code>evaluated</code>.
+   * In addition, it adds an EvanNode for the expression identified by the reference.
+   *
+   * @param referenceName The reference name to be marked as 'evaluated'.
+   * @param evalNode EvalNode to be added.
+   */
+  public void markAsEvaluated(String referenceName, EvalNode evalNode) throws PlanningException {
+    String normalized = referenceName;
+
+    int refId = nameToIdMap.get(normalized);
+    evaluationStateMap.put(refId, true);
+    idToEvalMap.put(refId, evalNode);
+
+    String originalName = checkAndGetIfAliasedColumn(normalized);
+    if (originalName != null) {
+      aliasedColumnMap.put(originalName, normalized);
+    }
+  }
+
+  /**
+   * It returns an original column name if it is aliased column reference.
+   * Otherwise, it will return NULL.
+   */
+  public String checkAndGetIfAliasedColumn(String name) {
+    Expr expr = getExpr(name);
+    if (expr != null && expr.getType() == OpType.Column) {
+      ColumnReferenceExpr column = (ColumnReferenceExpr) expr;
+      if (!column.getCanonicalName().equals(name)) {
+        return column.getCanonicalName();
+      }
+    }
+    return null;
+  }
+
+  public Target getTarget(String name) {
+    return getTarget(name, false);
+  }
+
+  /**
+   * It checks if a given name is the primary name.
+   *
+   * @See {@link NamedExprsManager}
+   * @see {@link NamedExprsManager#getPrimaryName}
+   *
+   * @param id The expression id
+   * @param name The name to be checked if it is primary name.
+   * @return The primary name
+   */
+  private boolean isPrimaryName(int id, String name) {
+    return idToNamesMap.get(id).get(0).equals(name);
+  }
+
+  /**
+   * One or more reference names can indicate one expression. Primary names are used for
+   * representing expressions. A primary name of an expression indicates the reference obtained when
+   * the expression is added firstly. All output schemas uses only primary names of expressions.
+   *
+   * @param id The expression id
+   * @return The primary name
+   */
+  private String getPrimaryName(int id) {
+    return idToNamesMap.get(id).get(0);
+  }
+
+  /**
+   * get a Target instance. A target consists of a reference name and an EvalNode corresponding to the reference name.
+   * According to evaluation state, it returns different EvalNodes.
+   * If the expression corresponding to the reference name is evaluated, it just returns {@link FieldEval}
+   * (i.e., a column reference). Otherwise, it returns the original EvalNode of the expression.
+   *
+   * @param referenceName The reference name to get EvalNode
+   * @param unevaluatedForm If TRUE, it always return the annotated EvalNode of the expression.
+   * @return
+   */
+  public Target getTarget(String referenceName, boolean unevaluatedForm) {
+    String normalized = referenceName;
+    int refId = nameToIdMap.get(normalized);
+
+    if (!unevaluatedForm && evaluationStateMap.containsKey(refId) && evaluationStateMap.get(refId)) {
+      EvalNode evalNode = idToEvalMap.get(refId);
+
+      // If the expression is already evaluated, it should use the FieldEval to access a field value.
+      // But, if this reference name is not primary name, it cannot use the reference name.
+      // It changes the given reference name to the primary name.
+      if (isEvaluated(normalized) && !isPrimaryName(refId, referenceName)) {
+        return new Target(new FieldEval(getPrimaryName(refId),evalNode.getValueType()), referenceName);
+      }
+
+      EvalNode referredEval;
+      if (evalNode.getType() == EvalType.CONST) {
+        referredEval = evalNode;
+      } else {
+        referredEval = new FieldEval(idToNamesMap.get(refId).get(0), evalNode.getValueType());
+      }
+      return new Target(referredEval, referenceName);
+
+    } else {
+      if (idToEvalMap.containsKey(refId)) {
+        return new Target(idToEvalMap.get(refId), referenceName);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public String toString() {
+    return "unevaluated=" + nameToIdMap.size() + ", evaluated=" + idToEvalMap.size()
+        + ", renamed=" + aliasedColumnMap.size();
+  }
+
+  /**
+   * It returns an iterator for unevaluated NamedExprs.
+   */
+  public Iterator<NamedExpr> getIteratorForUnevaluatedExprs() {
+    return new UnevaluatedIterator();
+  }
+
+  public class UnevaluatedIterator implements Iterator<NamedExpr> {
+    private final Iterator<NamedExpr> iterator;
+
+    public UnevaluatedIterator() {
+      List<NamedExpr> unEvaluatedList = TUtil.newList();
+      for (Integer refId: idToNamesMap.keySet()) {
+        String name = idToNamesMap.get(refId).get(0);
+        if (!isEvaluated(name)) {
+          Expr expr = idToExprBiMap.get(refId);
+          unEvaluatedList.add(new NamedExpr(expr, name));
+        }
+      }
+      if (unEvaluatedList.size() == 0) {
+        iterator = null;
+      } else {
+        iterator = unEvaluatedList.iterator();
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator != null && iterator.hasNext();
+    }
+
+    @Override
+    public NamedExpr next() {
+      return iterator.next();
+    }
+
+    @Override
+    public void remove() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
new file mode 100644
index 0000000..ebe47b4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.exception.InternalException;
+
+/**
+ * This class generates a physical execution plan.
+ */
+public interface PhysicalPlanner {
+  public PhysicalExec createPlan(TaskAttemptContext context,
+                                 LogicalNode logicalPlan)
+      throws InternalException;
+}