You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/17 10:23:37 UTC

[07/12] TAJO-501: Rewrite the projection part of logical planning.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
new file mode 100644
index 0000000..c97a30a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprFinder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+class ExprFinder extends SimpleAlgebraVisitor<ExprFinder.Context, Object> {
+
+  static class Context {
+    Set<Expr> set = new HashSet<Expr>();
+    OpType targetType;
+
+    Context(OpType type) {
+      this.targetType = type;
+    }
+  }
+
+  public static <T extends Expr> Set<T> finds(Expr expr, OpType type) {
+    Context context = new Context(type);
+    ExprFinder finder = new ExprFinder();
+    Stack<Expr> stack = new Stack<Expr>();
+    stack.push(expr);
+    try {
+      finder.visit(context, new Stack<Expr>(), expr);
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+    stack.pop();
+    return (Set<T>) context.set;
+  }
+
+  public Object visit(Context ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
+    if (expr instanceof UnaryOperator) {
+      preHook(ctx, stack, expr);
+      visitUnaryOperator(ctx, stack, (UnaryOperator) expr);
+      postHook(ctx, stack, expr, null);
+    } else if (expr instanceof BinaryOperator) {
+      preHook(ctx, stack, expr);
+      visitBinaryOperator(ctx, stack, (BinaryOperator) expr);
+      postHook(ctx, stack, expr, null);
+    } else {
+      super.visit(ctx, stack, expr);
+    }
+
+    if (ctx.targetType == expr.getType()) {
+      ctx.set.add(expr);
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
new file mode 100644
index 0000000..ca509c4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * ExprNormalizer performs two kinds of works:
+ *
+ * <h3>1. Duplication Removal.</h3>
+ *
+ * For example, assume a simple query as follows:
+ * <pre>
+ *   select price * rate as total_price, ..., order by price * rate
+ * </pre>
+ *
+ * The expression <code>price * rate</code> is duplicated in both select list and order by clause.
+ * Against those cases, ExprNormalizer removes duplicated expressions and replaces one with one reference.
+ * In the case, ExprNormalizer replaces price * rate with total_price reference.
+ *
+ * <h3>2. Dissection of Expression</h3>
+ *
+ * A expression can be a complex expressions, including a mixed of scalar and aggregation expressions.
+ * For example, assume an aggregation query as follows:
+ * <pre>
+ *   select sum(price * rate) * (1 - avg(discount_rate))), ...
+ * </pre>
+ *
+ * In this case, ExprNormalizer dissects the expression 'sum(price * rate) * (1 - avg(discount_rate)))'
+ * into the following expressions:
+ * <ul>
+ *   <li>$1 = price * rage</li>
+ *   <li>$2 = sum($1)</li>
+ *   <li>$3 = avg(discount_rate)</li>
+ *   <li>$4 = $3 * (1 - $3)</li>
+ * </ul>
+ *
+ * It mainly two advantages. Firstly, it makes complex expression evaluations easier across multiple physical executors.
+ * Second, it gives move opportunities to remove duplicated expressions.
+ *
+ * <h3>3. Name Normalization</h3>
+ *
+ * Users can use qualified column names, unqualified column names or aliased column references.
+ *
+ * Consider the following example:
+ *
+ * <pre>
+ *   select rate_a as total_rate, rate_a * 100, table1.rate_a, ... WHERE total_rate * 100
+ * </pre>
+ *
+ * <code>total_rate</code>, <code>rate_a</code>, and <code>table1.rate_a</code> are all the same references. But,
+ * they have different forms. Due to their different forms, duplication removal can be hard.
+ *
+ * In order to solve this problem, ExprNormalizer normalizes all column references as qualified names while it keeps
+ * its points..
+ */
+class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedResult, Object> {
+
+  public static class ExprNormalizedResult {
+    private final LogicalPlan plan;
+    private final LogicalPlan.QueryBlock block;
+
+    Expr baseExpr; // outmost expressions, which can includes one or more references of the results of aggregation
+                   // function.
+    List<NamedExpr> aggExprs = new ArrayList<NamedExpr>(); // aggregation functions
+    List<NamedExpr> scalarExprs = new ArrayList<NamedExpr>(); // scalar expressions which can be referred
+
+    private ExprNormalizedResult(LogicalPlanner.PlanContext context) {
+      this.plan = context.plan;
+      this.block = context.queryBlock;
+    }
+
+    @Override
+    public String toString() {
+      return baseExpr.toString() + ", agg=" + aggExprs.size() + ", scalar=" + scalarExprs.size();
+    }
+  }
+
+  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr) throws PlanningException {
+    ExprNormalizedResult exprNormalizedResult = new ExprNormalizedResult(context);
+    Stack<Expr> stack = new Stack<Expr>();
+    stack.push(expr);
+    visit(exprNormalizedResult, new Stack<Expr>(), expr);
+    exprNormalizedResult.baseExpr = stack.pop();
+    return exprNormalizedResult;
+  }
+
+  private boolean isAggregationFunction(Expr expr) {
+    return expr.getType() == OpType.GeneralSetFunction || expr.getType() == OpType.CountRowsFunction;
+  }
+
+  @Override
+  public Object visitCaseWhen(ExprNormalizedResult ctx, Stack<Expr> stack, CaseWhenPredicate expr)
+      throws PlanningException {
+    stack.push(expr);
+    for (CaseWhenPredicate.WhenExpr when : expr.getWhens()) {
+      visit(ctx, stack, when.getCondition());
+      visit(ctx, stack, when.getResult());
+
+      if (isAggregationFunction(when.getCondition())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getCondition());
+        ctx.aggExprs.add(new NamedExpr(when.getCondition(), referenceName));
+        when.setCondition(new ColumnReferenceExpr(referenceName));
+      }
+
+      if (isAggregationFunction(when.getResult())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getResult());
+        ctx.aggExprs.add(new NamedExpr(when.getResult(), referenceName));
+        when.setResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+
+    if (expr.hasElseResult()) {
+      visit(ctx, stack, expr.getElseResult());
+      if (isAggregationFunction(expr.getElseResult())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getElseResult());
+        ctx.aggExprs.add(new NamedExpr(expr.getElseResult(), referenceName));
+        expr.setElseResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+    stack.pop();
+    return expr;
+  }
+
+  @Override
+  public Expr visitUnaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
+    super.visitUnaryOperator(ctx, stack, expr);
+    if (isAggregationFunction(expr.getChild())) {
+      // Get an anonymous column name and replace the aggregation function by the column name
+      String refName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), refName));
+      expr.setChild(new ColumnReferenceExpr(refName));
+    }
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitBinaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    super.visitBinaryOperator(ctx, stack, expr);
+
+    ////////////////////////
+    // For Left Term
+    ////////////////////////
+
+    if (isAggregationFunction(expr.getLeft())) {
+      String leftRefName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
+      ctx.aggExprs.add(new NamedExpr(expr.getLeft(), leftRefName));
+      expr.setLeft(new ColumnReferenceExpr(leftRefName));
+    }
+
+
+    ////////////////////////
+    // For Right Term
+    ////////////////////////
+    if (isAggregationFunction(expr.getRight())) {
+      String rightRefName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
+      ctx.aggExprs.add(new NamedExpr(expr.getRight(), rightRefName));
+      expr.setRight(new ColumnReferenceExpr(rightRefName));
+    }
+
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Function Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitFunction(ExprNormalizedResult ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    for (int i = 0; i < expr.getParams().length; i++) {
+      param = expr.getParams()[i];
+      visit(ctx, stack, param);
+
+      if (isAggregationFunction(param)) {
+        String referenceName = ctx.plan.newGeneratedFieldName(param);
+        ctx.aggExprs.add(new NamedExpr(param, referenceName));
+        expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+      }
+    }
+
+    stack.pop();
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitGeneralSetFunction(ExprNormalizedResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+      throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    for (int i = 0; i < expr.getParams().length; i++) {
+      param = expr.getParams()[i];
+      visit(ctx, stack, param);
+
+      String referenceName = ctx.block.namedExprsMgr.addExpr(param);
+      ctx.scalarExprs.add(new NamedExpr(param, referenceName));
+      expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+    }
+    stack.pop();
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Literal Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitCastExpr(ExprNormalizedResult ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+    super.visitCastExpr(ctx, stack, expr);
+    if (expr.getChild().getType() == OpType.GeneralSetFunction
+        || expr.getChild().getType() == OpType.CountRowsFunction) {
+      String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), referenceName));
+      expr.setChild(new ColumnReferenceExpr(referenceName));
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitColumnReference(ExprNormalizedResult ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
+      throws PlanningException {
+    // normalize column references.
+    if (!expr.hasQualifier()) {
+      if (ctx.block.namedExprsMgr.contains(expr.getCanonicalName())) {
+        NamedExpr namedExpr = ctx.block.namedExprsMgr.getNamedExpr(expr.getCanonicalName());
+        return new ColumnReferenceExpr(namedExpr.getAlias());
+      } else {
+        String normalized = ctx.plan.getNormalizedColumnName(ctx.block, expr);
+        expr.setName(normalized);
+      }
+    }
+    return expr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
index 3c8ee5c..3fb05c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
@@ -22,29 +22,28 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.algebra.Aggregation;
-import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
 
 public class GroupElement implements Cloneable {
-  @Expose
-  private Aggregation.GroupType type;
-  @Expose private Column[] columns;
+  @Expose private Aggregation.GroupType type;
+  @Expose private EvalNode [] groupingSets;
 
   @SuppressWarnings("unused")
   public GroupElement() {
     // for gson
   }
 
-  public GroupElement(Aggregation.GroupType type, Column[] columns) {
+  public GroupElement(Aggregation.GroupType type, EvalNode [] groupingSets) {
     this.type = type;
-    this.columns = columns;
+    this.groupingSets = groupingSets;
   }
 
   public Aggregation.GroupType getType() {
     return this.type;
   }
 
-  public Column [] getColumns() {
-    return this.columns;
+  public EvalNode [] getGroupingSets() {
+    return this.groupingSets;
   }
 
   public String toString() {
@@ -56,9 +55,9 @@ public class GroupElement implements Cloneable {
   public Object clone() throws CloneNotSupportedException {
     GroupElement groups = (GroupElement) super.clone();
     groups.type = type;
-    groups.columns = new Column[columns.length];
-    for (int i = 0; i < columns.length; i++) {
-      groups.columns[i++] = (Column) columns[i].clone();
+    groups.groupingSets = new EvalNode [groupingSets.length];
+    for (int i = 0; i < groupingSets.length; i++) {
+      groups.groupingSets[i++] = (EvalNode) groupingSets[i].clone();
     }
     return groups;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index 4a3e479..ecba05a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -38,18 +38,19 @@ public class InsertNode extends LogicalNode implements Cloneable {
   @Expose private Options options;
   @Expose private LogicalNode subQuery;
 
-
-  public InsertNode(int pid, TableDesc desc, LogicalNode subQuery) {
+  public InsertNode(int pid) {
     super(pid, NodeType.INSERT);
+  }
+
+  public void setTargetTableDesc(TableDesc desc) {
     this.targetTableDesc = desc;
-    this.subQuery = subQuery;
-    this.setInSchema(subQuery.getOutSchema());
-    this.setOutSchema(subQuery.getOutSchema());
   }
 
-  public InsertNode(int pid, Path location, LogicalNode subQuery) {
-    super(pid, NodeType.INSERT);
-    this.path = location;
+  public void setTargetLocation(Path path) {
+    this.path = path;
+  }
+
+  public void setSubQuery(LogicalNode subQuery) {
     this.subQuery = subQuery;
     this.setInSchema(subQuery.getOutSchema());
     this.setOutSchema(subQuery.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index acbace0..b462a30 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -36,6 +36,7 @@ import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
 import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 
+import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.Stack;
 
@@ -77,7 +78,7 @@ public class LogicalOptimizer {
   private void optimizeJoinOrder(LogicalPlan plan, String blockName) throws PlanningException {
     LogicalPlan.QueryBlock block = plan.getBlock(blockName);
 
-    if (block.hasJoinNode()) {
+    if (block.hasNode(NodeType.JOIN)) {
       String originalOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
       double nonOptimizedJoinCost = JoinCostComputer.computeCost(plan, block);
 
@@ -87,12 +88,39 @@ public class LogicalOptimizer {
       // finding join order and restore remain filter order
       FoundJoinOrder order = joinOrderAlgorithm.findBestOrder(plan, block,
           joinGraphContext.joinGraph, joinGraphContext.relationsForProduct);
-      block.setJoinNode(order.getOrderedJoin());
+      JoinNode newJoinNode = order.getOrderedJoin();
 
+      JoinNode old = block.getNode(NodeType.JOIN);
+      JoinTargetCollector collector = new JoinTargetCollector();
+      Set<Target> targets = new LinkedHashSet<Target>();
+      collector.visitJoin(targets, plan, block, old, new Stack<LogicalNode>());
+
+      if (targets.size() == 0) {
+        newJoinNode.setTargets(PlannerUtil.schemaToTargets(old.getOutSchema()));
+      } else {
+        newJoinNode.setTargets(targets.toArray(new Target[targets.size()]));
+      }
+
+      PlannerUtil.replaceNode(plan, block.getRoot(), old, newJoinNode);
       String optimizedOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
+      block.addPlanHistory("Non-optimized join order: " + originalOrder + " (cost: " + nonOptimizedJoinCost + ")");
+      block.addPlanHistory("Optimized join order    : " + optimizedOrder + " (cost: " + order.getCost() + ")");
+    }
+  }
 
-      block.addHistory("Non-optimized join order: " + originalOrder + " (cost: " + nonOptimizedJoinCost + ")");
-      block.addHistory("Optimized join order    : " + optimizedOrder + " (cost: " + order.getCost() + ")");
+  private static class JoinTargetCollector extends BasicLogicalPlanVisitor<Set<Target>, LogicalNode> {
+    @Override
+    public LogicalNode visitJoin(Set<Target> ctx, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                                 Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitJoin(ctx, plan, block, node, stack);
+
+      if (node.hasTargets()) {
+        for (Target target : node.getTargets()) {
+          ctx.add(target);
+        }
+      }
+      return node;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 584fd6e..2519a95 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -19,12 +19,13 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.NotThreadSafe;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.exception.NoSuchColumnException;
 import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
@@ -39,13 +40,12 @@ import java.util.*;
  */
 @NotThreadSafe
 public class LogicalPlan {
-  private final LogicalPlanner planner;
-
   /** the prefix character for virtual tables */
-  public static final char VIRTUAL_TABLE_PREFIX='@';
+  public static final char VIRTUAL_TABLE_PREFIX='#';
+  public static final char NONAMED_COLUMN_PREFIX='?';
   /** it indicates the root block */
   public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
-  public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
+  public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "QB_";
   private int nextPid = 0;
   private Integer noNameBlockId = 0;
   private Integer noNameColumnId = 0;
@@ -54,10 +54,12 @@ public class LogicalPlan {
   private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
   private Map<Integer, LogicalNode> nodeMap = new HashMap<Integer, LogicalNode>();
   private Map<Integer, QueryBlock> queryBlockByPID = new HashMap<Integer, QueryBlock>();
+  private Map<String, String> exprToBlockNameMap = TUtil.newHashMap();
   private SimpleDirectedGraph<String, BlockEdge> queryBlockGraph = new SimpleDirectedGraph<String, BlockEdge>();
 
   /** planning and optimization log */
   private List<String> planingHistory = Lists.newArrayList();
+  LogicalPlanner planner;
 
   public LogicalPlan(LogicalPlanner planner) {
     this.planner = planner;
@@ -79,23 +81,39 @@ public class LogicalPlan {
     return nextPid++;
   }
 
-  public QueryBlock newNoNameBlock() {
+  public QueryBlock newQueryBlock() {
     return newAndGetBlock(NONAME_BLOCK_PREFIX + (noNameBlockId++));
   }
 
-  public String newNonameColumnName(String prefix) {
-    String suffix = noNameColumnId == 0 ? "" : String.valueOf(noNameColumnId);
-    noNameColumnId++;
-    return "?" + prefix + suffix;
+  private String generateFieldName(String prefix) {
+    int sequence = noNameColumnId++;
+    return NONAMED_COLUMN_PREFIX + prefix.toLowerCase() + (sequence > 0 ? "_" + sequence : "");
   }
 
-  /**
-   * Check if a query block exists
-   * @param blockName the query block name to be checked
-   * @return true if exists. Otherwise, false
-   */
-  public boolean existBlock(String blockName) {
-    return queryBlocks.containsKey(blockName);
+  public String newGeneratedFieldName(EvalNode evalNode) {
+    String prefix = evalNode.getName();
+    return generateFieldName(prefix);
+  }
+
+  public String newGeneratedFieldName(Expr expr) {
+    String prefix;
+
+    switch (expr.getType()) {
+    case CountRowsFunction:
+      prefix = "count";
+      break;
+    case GeneralSetFunction:
+      GeneralSetFunctionExpr setFunction = (GeneralSetFunctionExpr) expr;
+      prefix = setFunction.getSignature();
+      break;
+    case Function:
+      FunctionExpr function = (FunctionExpr) expr;
+      prefix = function.getSignature();
+      break;
+    default:
+      prefix = expr.getType().name();
+    }
+    return generateFieldName(prefix);
   }
 
   public QueryBlock getRootBlock() {
@@ -137,6 +155,18 @@ public class LogicalPlan {
     return childBlocks;
   }
 
+  public void mapExprToBlock(Expr expr, String blockName) {
+    exprToBlockNameMap.put(ObjectUtils.identityToString(expr), blockName);
+  }
+
+  public QueryBlock getBlockByExpr(Expr expr) {
+    return getBlock(exprToBlockNameMap.get(ObjectUtils.identityToString(expr)));
+  }
+
+  public String getBlockNameByExpr(Expr expr) {
+    return exprToBlockNameMap.get(ObjectUtils.identityToString(expr));
+  }
+
   public Collection<QueryBlock> getQueryBlocks() {
     return queryBlocks.values();
   }
@@ -145,13 +175,18 @@ public class LogicalPlan {
     return queryBlockGraph;
   }
 
+  public String getNormalizedColumnName(QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+    Column found = resolveColumn(block, columnRef);
+    return found.getQualifiedName();
+  }
+
   /**
    * It resolves a column.
    */
-  public Column resolveColumn(QueryBlock block, LogicalNode currentNode, ColumnReferenceExpr columnRef)
-      throws PlanningException {
+  public Column resolveColumn(QueryBlock block, ColumnReferenceExpr columnRef) throws PlanningException {
 
-    if (columnRef.hasQualifier()) { // if a column referenec is qualified
+    if (columnRef.hasQualifier()) { // if a column reference is qualified
 
       RelationNode relationOp = block.getRelation(columnRef.getQualifier());
 
@@ -159,44 +194,73 @@ public class LogicalPlan {
       if (relationOp == null) {
         // TODO - nested query can only refer outer query block? or not?
         for (QueryBlock eachBlock : queryBlocks.values()) {
-          if (eachBlock.containRelation(columnRef.getQualifier())) {
+          if (eachBlock.existsRelation(columnRef.getQualifier())) {
             relationOp = eachBlock.getRelation(columnRef.getQualifier());
           }
         }
       }
 
+      // If we cannot find any relation against a qualified column name
       if (relationOp == null) {
         throw new NoSuchColumnException(columnRef.getCanonicalName());
       }
 
       Schema schema = relationOp.getTableSchema();
-
       Column column = schema.getColumnByFQN(columnRef.getCanonicalName());
       if (column == null) {
-        throw new VerifyException("ERROR: no such a column '"+ columnRef.getCanonicalName() + "'");
+        throw new NoSuchColumnException(columnRef.getCanonicalName());
       }
 
-      return column;
+      // If code reach here, a column is found.
+      // But, it may be aliased from bottom logical node.
+      // If the column is aliased, the found name may not be used in upper node.
+
+      // Here, we try to check if column reference is already aliased.
+      // If so, it replaces the name with aliased name.
+      LogicalNode currentNode = block.getCurrentNode();
+
+      // The condition (currentNode.getInSchema().contains(column)) means
+      // the column can be used at the current node. So, we don't need to find aliase name.
+      if (currentNode != null && !currentNode.getInSchema().contains(column)) {
+        List<Column> candidates = TUtil.newList();
+        if (block.namedExprsMgr.isAliased(column.getQualifiedName())) {
+          String alias = block.namedExprsMgr.getAlias(columnRef.getCanonicalName());
+          Column found = resolveColumn(block, new ColumnReferenceExpr(alias));
+          if (found != null) {
+            candidates.add(found);
+          }
+        }
+        if (!candidates.isEmpty()) {
+          return ensureUniqueColumn(candidates);
+        }
+      }
 
+      return column;
     } else { // if a column reference is not qualified
 
-      // if current logical node is available
-      if (currentNode != null && currentNode.getOutSchema() != null) {
-        Column found = currentNode.getOutSchema().getColumnByName(columnRef.getName());
+      // Trying to find the column within the current block
+
+      if (block.getLatestNode() != null) {
+        Column found = block.getLatestNode().getOutSchema().getColumnByName(columnRef.getName());
         if (found != null) {
           return found;
         }
       }
 
-      if (block.getLatestNode() != null) {
-        Column found = block.getLatestNode().getOutSchema().getColumnByName(columnRef.getName());
+      List<Column> candidates = TUtil.newList();
+      // Trying to find columns from aliased references.
+      if (block.namedExprsMgr.isAliased(columnRef.getCanonicalName())) {
+        String originalName = block.namedExprsMgr.getAlias(columnRef.getCanonicalName());
+        Column found = resolveColumn(block, new ColumnReferenceExpr(originalName));
         if (found != null) {
-          return found;
+          candidates.add(found);
         }
       }
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
 
       // Trying to find columns from other relations in the current block
-      List<Column> candidates = TUtil.newList();
       for (RelationNode rel : block.getRelations()) {
         Column found = rel.getOutSchema().getColumnByName(columnRef.getName());
         if (found != null) {
@@ -238,16 +302,6 @@ public class LogicalPlan {
     }
   }
 
-  /**
-   * replace the found column if the column is renamed to an alias name
-   */
-  public Column getColumnOrAliasedColumn(QueryBlock block, Column column) throws PlanningException {
-    if (block.targetListManager.isResolve(column)) {
-      column = block.targetListManager.getResolvedColumn(column);
-    }
-    return column;
-  }
-
   private static Column ensureUniqueColumn(List<Column> candidates)
       throws VerifyException {
     if (candidates.size() == 1) {
@@ -282,9 +336,9 @@ public class LogicalPlan {
         new DirectedGraphCursor<String, BlockEdge>(queryBlockGraph, getRootBlock().getName());
     while(cursor.hasNext()) {
       QueryBlock block = getBlock(cursor.nextBlock());
-      if (block.getPlaningHistory().size() > 0) {
+      if (block.getPlanHistory().size() > 0) {
         sb.append("\n[").append(block.getName()).append("]\n");
-        for (String log : block.getPlaningHistory()) {
+        for (String log : block.getPlanHistory()) {
           sb.append("> ").append(log).append("\n");
         }
       }
@@ -308,7 +362,7 @@ public class LogicalPlan {
             ExplainLogicalPlanVisitor.printDepthString(explainContext.getMaxDepth(), explainContext.explains.pop()));
       }
     } catch (PlanningException e) {
-      e.printStackTrace();
+      throw new RuntimeException(e);
     }
 
     return explains.toString();
@@ -366,41 +420,43 @@ public class LogicalPlan {
   }
 
   public class QueryBlock {
-    private String blockName;
+    private final String blockName;
     private LogicalNode rootNode;
     private NodeType rootType;
-    private Map<String, RelationNode> relations = new HashMap<String, RelationNode>();
-    private Map<OpType, List<Expr>> algebraicExprs = TUtil.newHashMap();
 
-    // changing states
+    // transient states
+    private final Map<String, RelationNode> nameToRelationMap = TUtil.newHashMap();
+    private final Map<OpType, List<Expr>> operatorToExprMap = TUtil.newHashMap();
+    /**
+     * It's a map between nodetype and node. node types can be duplicated. So, latest node type is only kept.
+     */
+    private final Map<NodeType, LogicalNode> nodeTypeToNodeMap = TUtil.newHashMap();
+    private final Map<String, LogicalNode> exprToNodeMap = TUtil.newHashMap();
+    final NamedExprsManager namedExprsMgr;
+
+    private LogicalNode currentNode;
     private LogicalNode latestNode;
-    private boolean resolvedGrouping = true;
-    private boolean hasGrouping;
-    private Projectable projectionNode;
-    private GroupbyNode groupingNode;
-    private JoinNode joinNode;
-    private SelectionNode selectionNode;
-    private StoreTableNode storeTableNode;
-    private InsertNode insertNode;
+    private final Set<JoinType> includedJoinTypes = TUtil.newHashSet();
+    /**
+     * Set true value if this query block has either implicit or explicit aggregation.
+     */
+    private boolean aggregationRequired = true;
     private Schema schema;
 
-    TargetListManager targetListManager;
-
     /** It contains a planning log for this block */
-    private List<String> planingHistory = Lists.newArrayList();
+    private final List<String> planingHistory = Lists.newArrayList();
+    /** It is for debugging or unit tests */
+    private Target [] unresolvedTargets;
 
     public QueryBlock(String blockName) {
       this.blockName = blockName;
+      this.namedExprsMgr = new NamedExprsManager(LogicalPlan.this);
     }
 
     public String getName() {
       return blockName;
     }
 
-    public boolean hasRoot() {
-      return rootNode != null;
-    }
-
     public void refresh() {
       setRoot(rootNode);
     }
@@ -411,7 +467,6 @@ public class LogicalPlan {
         LogicalRootNode rootNode = (LogicalRootNode) blockRoot;
         rootType = rootNode.getChild().getType();
       }
-      queryBlockByPID.put(blockRoot.getPID(), this);
     }
 
     public <NODE extends LogicalNode> NODE getRoot() {
@@ -422,20 +477,32 @@ public class LogicalPlan {
       return rootType;
     }
 
-    public boolean containRelation(String name) {
-      return relations.containsKey(PlannerUtil.normalizeTableName(name));
+    public Target [] getUnresolvedTargets() {
+      return unresolvedTargets;
     }
 
-    public void addRelation(RelationNode relation) {
-      relations.put(PlannerUtil.normalizeTableName(relation.getCanonicalName()), relation);
+    public void setUnresolvedTargets(Target [] unresolvedTargets) {
+      this.unresolvedTargets = unresolvedTargets;
+    }
+
+    public boolean existsRelation(String name) {
+      return nameToRelationMap.containsKey(PlannerUtil.normalizeTableName(name));
     }
 
     public RelationNode getRelation(String name) {
-      return relations.get(PlannerUtil.normalizeTableName(name));
+      return nameToRelationMap.get(PlannerUtil.normalizeTableName(name));
+    }
+
+    public void addRelation(RelationNode relation) {
+      nameToRelationMap.put(PlannerUtil.normalizeTableName(relation.getCanonicalName()), relation);
     }
 
     public Collection<RelationNode> getRelations() {
-      return this.relations.values();
+      return this.nameToRelationMap.values();
+    }
+
+    public boolean hasTableExpression() {
+      return this.nameToRelationMap.size() > 0;
     }
 
     public void setSchema(Schema schema) {
@@ -446,8 +513,22 @@ public class LogicalPlan {
       return schema;
     }
 
-    public boolean hasTableExpression() {
-      return this.relations.size() > 0;
+    public NamedExprsManager getNamedExprsManager() {
+      return namedExprsMgr;
+    }
+
+    public void updateCurrentNode(Expr expr) throws PlanningException {
+
+      if (expr.getType() != OpType.RelationList) { // skip relation list because it is a virtual expr.
+        this.currentNode = exprToNodeMap.get(ObjectUtils.identityToString(expr));
+        if (currentNode == null) {
+          throw new PlanningException("Unregistered Algebra Expression: " + expr.getType());
+        }
+      }
+    }
+
+    public <T extends LogicalNode> T getCurrentNode() {
+      return (T) this.currentNode;
     }
 
     public void updateLatestNode(LogicalNode node) {
@@ -459,384 +540,94 @@ public class LogicalPlan {
     }
 
     public void setAlgebraicExpr(Expr expr) {
-      TUtil.putToNestedList(algebraicExprs, expr.getType(), expr);
+      TUtil.putToNestedList(operatorToExprMap, expr.getType(), expr);
     }
 
     public boolean hasAlgebraicExpr(OpType opType) {
-      return algebraicExprs.containsKey(opType);
+      return operatorToExprMap.containsKey(opType);
     }
 
     public <T extends Expr> List<T> getAlgebraicExpr(OpType opType) {
-      return (List<T>) algebraicExprs.get(opType);
+      return (List<T>) operatorToExprMap.get(opType);
     }
 
     public <T extends Expr> T getSingletonExpr(OpType opType) {
       if (hasAlgebraicExpr(opType)) {
-        return (T) algebraicExprs.get(opType).get(0);
+        return (T) operatorToExprMap.get(opType).get(0);
       } else {
         return null;
       }
     }
 
-    public boolean hasProjection() {
-      return hasAlgebraicExpr(OpType.Projection);
-    }
-
-    public Projection getProjection() {
-      return getSingletonExpr(OpType.Projection);
-    }
-
-    public boolean hasHaving() {
-      return hasAlgebraicExpr(OpType.Having);
-    }
-
-    public Having getHaving() {
-      return getSingletonExpr(OpType.Having);
-    }
-
-    public void setProjectionNode(Projectable node) {
-      this.projectionNode = node;
-    }
-
-    public Projectable getProjectionNode() {
-      return this.projectionNode;
+    public boolean hasNode(NodeType nodeType) {
+      return nodeTypeToNodeMap.containsKey(nodeType);
     }
 
-    public boolean isGroupingResolved() {
-      return this.resolvedGrouping;
-    }
-
-    public void resolveGroupingRequired() {
-      this.resolvedGrouping = true;
-    }
-
-    public void setHasGrouping() {
-      hasGrouping = true;
-      resolvedGrouping = false;
-    }
+    public void registerNode(LogicalNode node) {
+      // id -> node
+      nodeMap.put(node.getPID(), node);
 
-    public boolean hasGrouping() {
-      return hasGrouping || hasGroupbyNode();
-    }
+      // So, this is only for filter, groupby, sort, limit, projection, which exists once at a query block.
+      nodeTypeToNodeMap.put(node.getType(), node);
 
-    public boolean hasGroupbyNode() {
-      return this.groupingNode != null;
+      queryBlockByPID.put(node.getPID(), this);
     }
 
-    public void setGroupbyNode(GroupbyNode groupingNode) {
-      this.groupingNode = groupingNode;
+    public <T extends LogicalNode> T getNode(NodeType nodeType) {
+      return (T) nodeTypeToNodeMap.get(nodeType);
     }
 
-    public GroupbyNode getGroupbyNode() {
-      return this.groupingNode;
+    // expr -> node
+    public void registerExprWithNode(Expr expr, LogicalNode node) {
+      exprToNodeMap.put(ObjectUtils.identityToString(expr), node);
     }
 
-    public boolean hasJoinNode() {
-      return joinNode != null;
+    public <T extends LogicalNode> T getNodeFromExpr(Expr expr) {
+      return (T) exprToNodeMap.get(ObjectUtils.identityToString(expr));
     }
 
     /**
-     * @return the topmost JoinNode instance
+     * This flag can be changed as a plan is generated.
+     *
+     * True value means that this query should have aggregation phase. If aggregation plan is added to this block,
+     * it becomes false because it doesn't need aggregation phase anymore. It is usually used to add aggregation
+     * phase from SELECT statement without group-by clause.
+     *
+     * @return True if aggregation is needed but this query hasn't had aggregation phase.
      */
-    public JoinNode getJoinNode() {
-      return joinNode;
-    }
-
-    public void setJoinNode(JoinNode node) {
-      if (joinNode == null || latestNode == node) {
-        this.joinNode = node;
-      } else {
-        PlannerUtil.replaceNode(LogicalPlan.this, latestNode, this.joinNode, node);
-      }
-    }
-
-    public boolean hasSelectionNode() {
-      return this.selectionNode != null;
-    }
-
-    public void setSelectionNode(SelectionNode selectionNode) {
-      this.selectionNode = selectionNode;
+    public boolean isAggregationRequired() {
+      return this.aggregationRequired;
     }
 
-    public SelectionNode getSelectionNode() {
-      return selectionNode;
-    }
-
-    public boolean hasStoreTableNode() {
-      return this.storeTableNode != null;
-    }
-
-    public void setStoreTableNode(StoreTableNode storeTableNode) {
-      this.storeTableNode = storeTableNode;
-    }
-
-    public StoreTableNode getStoreTableNode() {
-      return this.storeTableNode;
+    /**
+     * Unset aggregation required flag. It has to be called after an aggregation phase is added to this block.
+     */
+    public void unsetAggregationRequire() {
+      this.aggregationRequired = true;
     }
 
-    public boolean hasInsertNode() {
-      return this.insertNode != null;
+    public void setAggregationRequire() {
+      aggregationRequired = false;
     }
 
-    public InsertNode getInsertNode() {
-      return this.insertNode;
+    public boolean containsJoinType(JoinType joinType) {
+      return includedJoinTypes.contains(joinType);
     }
 
-    public void setInsertNode(InsertNode insertNode) {
-      this.insertNode = insertNode;
+    public void addJoinType(JoinType joinType) {
+      includedJoinTypes.add(joinType);
     }
 
-    public List<String> getPlaningHistory() {
+    public List<String> getPlanHistory() {
       return planingHistory;
     }
 
-    public void addHistory(String history) {
+    public void addPlanHistory(String history) {
       this.planingHistory.add(history);
     }
 
-    public boolean postVisit(LogicalNode node, Stack<OpType> path) {
-      if (nodeMap.containsKey(node.getPID())) {
-        return false;
-      }
-
-      nodeMap.put(node.getPID(), node);
-      updateLatestNode(node);
-
-      // if an added operator is a relation, add it to relation set.
-      switch (node.getType()) {
-        case STORE:
-          setStoreTableNode((StoreTableNode) node);
-          break;
-
-        case SCAN:
-          ScanNode relationOp = (ScanNode) node;
-          addRelation(relationOp);
-          break;
-
-        case GROUP_BY:
-          resolveGroupingRequired();
-          setGroupbyNode((GroupbyNode) node);
-          break;
-
-        case JOIN:
-          setJoinNode((JoinNode) node);
-          break;
-
-        case SELECTION:
-          setSelectionNode((SelectionNode) node);
-          break;
-
-        case INSERT:
-          setInsertNode((InsertNode) node);
-          break;
-
-        case TABLE_SUBQUERY:
-          TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) node;
-          addRelation(tableSubQueryNode);
-          break;
-      }
-
-      // if this node is the topmost
-      if (path.size() == 0) {
-        setRoot(node);
-      }
-
-      return true;
-    }
-
     public String toString() {
       return blockName;
     }
-
-    ///////////////////////////////////////////////////////////////////////////
-    //                 Target List Management Methods
-    ///////////////////////////////////////////////////////////////////////////
-    //
-    // A target list means a list of expressions after SELECT keyword in SQL.
-    //
-    // SELECT rel1.col1, sum(rel1.col2), res1.col3 + res2.col2, ... FROM ...
-    //        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-    //                        TARGET LIST
-    //
-    ///////////////////////////////////////////////////////////////////////////
-
-    public void initTargetList(Target[] original) {
-      targetListManager = new TargetListManager(LogicalPlan.this, original);
-    }
-
-    public boolean isTargetResolved(int targetId) {
-      return targetListManager.isResolved(targetId);
-    }
-
-    public void resolveAllTargetList() {
-      targetListManager.resolveAll();
-    }
-
-    public void resolveTarget(int idx) {
-      targetListManager.resolve(idx);
-    }
-
-    public boolean isAlreadyTargetCreated(int idx) {
-      return getTarget(idx) != null;
-    }
-
-    public Target getTarget(int idx) {
-      return targetListManager.getTarget(idx);
-    }
-
-    public int getTargetListNum() {
-      return targetListManager.size();
-    }
-
-    public void fillTarget(int idx) throws PlanningException {
-      Target target = planner.createTarget(LogicalPlan.this, this, getProjection().getTargets()[idx]);
-      // below code reaches only when target is created.
-      targetListManager.fill(idx, target);
-    }
-
-    public boolean checkIfTargetCanBeEvaluated(int targetId, LogicalNode node) {
-      return isAlreadyTargetCreated(targetId)
-          && PlannerUtil.canBeEvaluated(targetListManager.getTarget(targetId).getEvalTree(), node);
-    }
-
-    public TargetListManager getTargetListManager() {
-      return targetListManager;
-    }
-
-    public Target [] getCurrentTargets() {
-      return targetListManager.getTargets();
-    }
-
-    public Schema updateSchema() {
-      return targetListManager.getUpdatedSchema();
-    }
-
-    public void fillTargets() throws PlanningException {
-      for (int i = 0; i < getTargetListNum(); i++) {
-        if (!isAlreadyTargetCreated(i)) {
-          try {
-            fillTarget(i);
-          } catch (VerifyException e) {
-          }
-        }
-      }
-    }
-
-    public void checkAndResolveTargets(LogicalNode node) throws PlanningException {
-      // If all columns are projected and do not include any expression
-      if (getProjection().isAllProjected() && node instanceof RelationNode) {
-        initTargetList(PlannerUtil.schemaToTargets(node.getOutSchema()));
-        resolveAllTargetList();
-
-      } else {
-        // fill a target if an annotated target can be created.
-        // Some targets which are based on multiple relations can be created only in a join node.
-        fillTargets();
-
-        // add target to list if a target can be evaluated at this node
-        List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
-        for (int i = 0; i < getTargetListNum(); i++) {
-          if (getTarget(i) != null && !isTargetResolved(i)) {
-            EvalNode expr = getTarget(i).getEvalTree();
-
-            if (checkIfTargetCanBeEvaluated(i, node)) {
-
-              if (node instanceof RelationNode) { // for scan node
-                if (expr.getType() == EvalType.FIELD) {
-                  resolveTarget(i);
-                  if (getTarget(i).hasAlias()) {
-                    newEvaluatedTargetIds.add(i);
-                  }
-                } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
-                  // if this expression does no contain any aggregation function
-                  resolveTarget(i);
-                  newEvaluatedTargetIds.add(i);
-                }
-
-              } else if (node instanceof GroupbyNode) { // for grouping
-                if (EvalTreeUtil.findDistinctAggFunction(expr).size() > 0) {
-                  resolveTarget(i);
-                  newEvaluatedTargetIds.add(i);
-                }
-
-              } else if (node instanceof JoinNode) { // for join
-                if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
-                  // if this expression does no contain any aggregation function,
-                  resolveTarget(i);
-                  newEvaluatedTargetIds.add(i);
-                }
-              }
-            }
-          }
-        }
-
-        if (node instanceof ScanNode || node instanceof JoinNode) {
-
-          Schema baseSchema = null;
-          if (node instanceof ScanNode) {
-            baseSchema = ((ScanNode)node).getTableSchema();
-          } else if (node instanceof JoinNode) {
-            baseSchema = node.getInSchema(); // composite schema
-          }
-
-          if (newEvaluatedTargetIds.size() > 0) {
-            // fill addedTargets with output columns and new expression columns (e.g., aliased column or expressions)
-            Target[] addedTargets = new Target[baseSchema.getColumnNum() + newEvaluatedTargetIds.size()];
-            PlannerUtil.schemaToTargets(baseSchema, addedTargets);
-            int baseIdx = baseSchema.getColumnNum();
-            for (int i = 0; i < newEvaluatedTargetIds.size(); i++) {
-              addedTargets[baseIdx + i] = getTarget(newEvaluatedTargetIds.get(i));
-            }
-
-            // set targets to ScanNode because it needs to evaluate expressions
-            ((Projectable)node).setTargets(addedTargets);
-            // the output schema of ScanNode has to have the combination of the original output and newly-added targets.
-            node.setOutSchema(LogicalPlanner.getProjectedSchema(LogicalPlan.this, addedTargets));
-          } else {
-            // if newEvaluatedTargetIds == 0, the original input schema will be used as the output schema.
-            node.setOutSchema(node.getInSchema());
-          }
-        } else if (node instanceof GroupbyNode) {
-          // Set the current targets to the GroupByNode because the GroupByNode is the last projection operator.
-          GroupbyNode groupbyNode = (GroupbyNode) node;
-          groupbyNode.setTargets(targetListManager.getUpdatedTarget(Sets.newHashSet(newEvaluatedTargetIds)));
-          //groupbyNode.setTargets(getCurrentTargets());
-          boolean distinct = false;
-          for (Target target : groupbyNode.getTargets()) {
-            for (AggregationFunctionCallEval aggrFunc : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
-              if (aggrFunc.isDistinct()) {
-                distinct = true;
-                break;
-              }
-            }
-          }
-          groupbyNode.setDistinct(distinct);
-          node.setOutSchema(updateSchema());
-
-          // if a having condition is given,
-          if (hasHaving()) {
-            EvalNode havingCondition = planner.createEvalTree(LogicalPlan.this, this, getHaving().getQual());
-            List<AggregationFunctionCallEval> aggrFunctions = EvalTreeUtil.findDistinctAggFunction(havingCondition);
-
-            if (aggrFunctions.size() == 0) {
-              groupbyNode.setHavingCondition(havingCondition);
-            } else {
-              Target [] addedTargets = new Target[aggrFunctions.size()];
-              for (int i = 0; i < aggrFunctions.size(); i++) {
-                Target aggrFunctionTarget = new Target(aggrFunctions.get(i),
-                    newNonameColumnName(aggrFunctions.get(i).getName()));
-                addedTargets[i] = aggrFunctionTarget;
-                EvalTreeUtil.replace(havingCondition, aggrFunctions.get(i),
-                    new FieldEval(aggrFunctionTarget.getColumnSchema()));
-              }
-              Target [] updatedTargets = TUtil.concat(groupbyNode.getTargets(), addedTargets);
-              groupbyNode.setTargets(updatedTargets);
-              groupbyNode.setHavingCondition(havingCondition);
-              groupbyNode.setHavingSchema(PlannerUtil.targetToSchema(groupbyNode.getTargets()));
-            }
-          }
-        }
-      }
-    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
new file mode 100644
index 0000000..3ab328c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalType;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+import java.util.Stack;
+
+/**
+ * It finds all relations for each block and builds base schema information.
+ */
+class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor.PreprocessContext, LogicalNode> {
+  private ExprAnnotator annotator;
+
+  static class PreprocessContext {
+    LogicalPlan plan;
+    LogicalPlan.QueryBlock currentBlock;
+
+    public PreprocessContext(LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) {
+      this.plan = plan;
+      this.currentBlock = currentBlock;
+    }
+
+    public PreprocessContext(PreprocessContext context, LogicalPlan.QueryBlock currentBlock) {
+      this.plan = context.plan;
+      this.currentBlock = currentBlock;
+    }
+  }
+
+  /** Catalog service */
+  private CatalogService catalog;
+
+  LogicalPlanPreprocessor(CatalogService catalog, ExprAnnotator annotator) {
+    this.catalog = catalog;
+    this.annotator = annotator;
+  }
+
+  @Override
+  public void preHook(PreprocessContext ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
+    ctx.currentBlock.setAlgebraicExpr(expr);
+    ctx.plan.mapExprToBlock(expr, ctx.currentBlock.getName());
+  }
+
+  @Override
+  public LogicalNode postHook(PreprocessContext ctx, Stack<Expr> stack, Expr expr, LogicalNode result) throws PlanningException {
+    // If non-from statement, result can be null. It avoids that case.
+    if (result != null) {
+      // setNode method registers each node to corresponding block and plan.
+      ctx.currentBlock.registerNode(result);
+      // It makes a map between an expr and a logical node.
+      ctx.currentBlock.registerExprWithNode(expr, result);
+    }
+    return result;
+  }
+
+  @Override
+  public LogicalNode visitProjection(PreprocessContext ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
+    // If Non-from statement, it immediately returns.
+    if (!expr.hasChild()) {
+      return new EvalExprNode(ctx.plan.newPID());
+    }
+
+    stack.push(expr); // <--- push
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+
+    Target [] targets;
+    if (expr.isAllProjected()) {
+      targets = PlannerUtil.schemaToTargets(child.getOutSchema());
+    } else {
+      targets = new Target[expr.getNamedExprs().length];
+
+      for (int i = 0; i < expr.getNamedExprs().length; i++) {
+        NamedExpr namedExpr = expr.getNamedExprs()[i];
+        EvalNode evalNode = annotator.createEvalNode(ctx.plan, ctx.currentBlock, namedExpr.getExpr());
+
+        if (namedExpr.hasAlias()) {
+          targets[i] = new Target(evalNode, namedExpr.getAlias());
+        } else if (evalNode.getType() == EvalType.FIELD) {
+          targets[i] = new Target((FieldEval) evalNode);
+        } else {
+          targets[i] = new Target(evalNode, "?name_" + i);
+        }
+      }
+    }
+    stack.pop(); // <--- Pop
+
+    ProjectionNode projectionNode = new ProjectionNode(ctx.plan.newPID());
+    projectionNode.setInSchema(child.getOutSchema());
+    projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    return projectionNode;
+  }
+
+  @Override
+  public LogicalNode visitLimit(PreprocessContext ctx, Stack<Expr> stack, Limit expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    LimitNode limitNode = new LimitNode(ctx.plan.newPID());
+    limitNode.setInSchema(child.getOutSchema());
+    limitNode.setOutSchema(child.getOutSchema());
+    return limitNode;
+  }
+
+  @Override
+  public LogicalNode visitSort(PreprocessContext ctx, Stack<Expr> stack, Sort expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    SortNode sortNode = new SortNode(ctx.plan.newPID());
+    sortNode.setInSchema(child.getOutSchema());
+    sortNode.setOutSchema(child.getOutSchema());
+    return sortNode;
+  }
+
+  @Override
+  public LogicalNode visitHaving(PreprocessContext ctx, Stack<Expr> stack, Having expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    HavingNode havingNode = new HavingNode(ctx.plan.newPID());
+    havingNode.setInSchema(child.getOutSchema());
+    havingNode.setOutSchema(child.getOutSchema());
+    return havingNode;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(PreprocessContext ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    stack.push(expr); // <--- push
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+
+    Projection projection = ctx.currentBlock.getSingletonExpr(OpType.Projection);
+    int finalTargetNum = projection.getNamedExprs().length;
+    Target [] targets = new Target[finalTargetNum];
+
+    for (int i = 0; i < finalTargetNum; i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+      EvalNode evalNode = annotator.createEvalNode(ctx.plan, ctx.currentBlock, namedExpr.getExpr());
+
+      if (namedExpr.hasAlias()) {
+        targets[i] = new Target(evalNode, namedExpr.getAlias());
+      } else {
+        targets[i] = new Target(evalNode, "?name_" + i);
+      }
+    }
+    stack.pop();
+
+    GroupbyNode groupByNode = new GroupbyNode(ctx.plan.newPID());
+    groupByNode.setInSchema(child.getOutSchema());
+    groupByNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    return groupByNode;
+  }
+
+  @Override
+  public LogicalNode visitUnion(PreprocessContext ctx, Stack<Expr> stack, SetOperation expr) throws PlanningException {
+    LogicalPlan.QueryBlock leftBlock = ctx.plan.newQueryBlock();
+    PreprocessContext leftContext = new PreprocessContext(ctx, leftBlock);
+    LogicalNode leftChild = visit(leftContext, new Stack<Expr>(), expr.getLeft());
+    ctx.currentBlock.registerExprWithNode(expr.getLeft(), leftChild);
+
+    LogicalPlan.QueryBlock rightBlock = ctx.plan.newQueryBlock();
+    PreprocessContext rightContext = new PreprocessContext(ctx, rightBlock);
+    LogicalNode rightChild = visit(rightContext, new Stack<Expr>(), expr.getRight());
+    ctx.currentBlock.registerExprWithNode(expr.getRight(), rightChild);
+
+    UnionNode unionNode = new UnionNode(ctx.plan.newPID());
+    unionNode.setLeftChild(leftChild);
+    unionNode.setRightChild(rightChild);
+    unionNode.setInSchema(leftChild.getOutSchema());
+    unionNode.setOutSchema(leftChild.getOutSchema());
+
+    return unionNode;
+  }
+
+  public LogicalNode visitFilter(PreprocessContext ctx, Stack<Expr> stack, Selection expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    SelectionNode selectionNode = new SelectionNode(ctx.plan.newPID());
+    selectionNode.setInSchema(child.getOutSchema());
+    selectionNode.setOutSchema(child.getOutSchema());
+    return selectionNode;
+  }
+
+  @Override
+  public LogicalNode visitJoin(PreprocessContext ctx, Stack<Expr> stack, Join expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode left = visit(ctx, stack, expr.getLeft());
+    LogicalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+    JoinNode joinNode = new JoinNode(ctx.plan.newPID());
+    joinNode.setJoinType(expr.getJoinType());
+    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    joinNode.setInSchema(merged);
+    joinNode.setOutSchema(merged);
+
+    ctx.currentBlock.addJoinType(expr.getJoinType());
+    return joinNode;
+  }
+
+  @Override
+  public LogicalNode visitRelation(PreprocessContext ctx, Stack<Expr> stack, Relation expr)
+      throws PlanningException {
+
+    Relation relation = expr;
+    TableDesc desc = catalog.getTableDesc(relation.getName());
+
+    ScanNode scanNode;
+    if (relation.hasAlias()) {
+      scanNode = new ScanNode(ctx.plan.newPID(), desc, relation.getAlias());
+    } else {
+      scanNode = new ScanNode(ctx.plan.newPID(), desc);
+    }
+    ctx.currentBlock.addRelation(scanNode);
+
+    return scanNode;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(PreprocessContext ctx, Stack<Expr> stack, TablePrimarySubQuery expr)
+      throws PlanningException {
+
+    PreprocessContext newContext;
+    // Note: TableSubQuery always has a table name.
+    // SELECT .... FROM (SELECT ...) TB_NAME <-
+    newContext = new PreprocessContext(ctx, ctx.plan.newAndGetBlock(expr.getName()));
+    LogicalNode child = super.visitTableSubQuery(newContext, stack, expr);
+
+    // a table subquery should be dealt as a relation.
+    TableSubQueryNode node = new TableSubQueryNode(ctx.plan.newPID(), expr.getName(), child);
+    ctx.currentBlock.addRelation(node);
+    return node;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public LogicalNode visitCreateTable(PreprocessContext ctx, Stack<Expr> stack, CreateTable expr)
+      throws PlanningException {
+
+    CreateTableNode createTableNode = new CreateTableNode(ctx.plan.newPID());
+
+    if (expr.hasSubQuery()) {
+      stack.push(expr);
+      visit(ctx, stack, expr.getSubQuery());
+      stack.pop();
+    }
+
+    return createTableNode;
+  }
+
+  @Override
+  public LogicalNode visitDropTable(PreprocessContext ctx, Stack<Expr> stack, DropTable expr)
+      throws PlanningException {
+    DropTableNode dropTable = new DropTableNode(ctx.plan.newPID());
+    return dropTable;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public LogicalNode visitInsert(PreprocessContext ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+    PreprocessContext newContext = new PreprocessContext(ctx, ctx.plan.newQueryBlock());
+    LogicalNode child = super.visitInsert(newContext, stack, expr);
+
+    InsertNode insertNode = new InsertNode(ctx.plan.newPID());
+    insertNode.setInSchema(child.getOutSchema());
+    insertNode.setOutSchema(child.getOutSchema());
+    return insertNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index e48b655..41e42d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -71,9 +71,6 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
       }
     }
 
-    if (node.hasHavingCondition()) {
-      ExprsVerifier.verify(state, node.getHavingCondition());
-    }
     return node;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index 697a8c6..bf41996 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -35,6 +35,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitSort(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node,
                    Stack<LogicalNode> stack) throws PlanningException;
 
+  RESULT visitHaving(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
+                      Stack<LogicalNode> stack) throws PlanningException;
+
   RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
                       Stack<LogicalNode> stack) throws PlanningException;