You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/12 07:07:21 UTC

[2/2] git commit: TAJO-121: Add LogicalPlanVisitor and Refactor LogicalOptimizer to use the visitor. (hyunsik)

TAJO-121: Add LogicalPlanVisitor and Refactor LogicalOptimizer to use the visitor. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/cc2f5c8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/cc2f5c8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/cc2f5c8e

Branch: refs/heads/master
Commit: cc2f5c8ea9c7a3f1176cf8573b85b3f35194b0f4
Parents: cf6bd4b
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Aug 12 13:53:48 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Aug 12 14:06:50 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   6 +-
 .../engine/planner/BasicLogicalPlanVisitor.java | 200 +++++++
 .../tajo/engine/planner/LogicalOptimizer.java   | 530 +------------------
 .../tajo/engine/planner/LogicalPlanVisitor.java |  50 ++
 .../tajo/engine/planner/LogicalPlanner.java     |   2 +-
 .../tajo/engine/planner/TargetListManager.java  |   5 +-
 .../planner/rewrite/FilterPushDownRule.java     | 155 ++++++
 .../planner/rewrite/ProjectionPushDownRule.java | 376 +++++++++++++
 .../org/apache/tajo/master/GlobalEngine.java    |   9 +-
 .../org/apache/tajo/BackendTestingUtil.java     |   5 +-
 .../plan/global/TestGlobalQueryPlanner.java     |  14 +-
 .../tajo/engine/planner/TestLogicalNode.java    |   3 +-
 .../engine/planner/TestLogicalOptimizer.java    |  15 +-
 .../global/TestGlobalQueryOptimizer.java        |   4 +-
 .../planner/physical/TestBNLJoinExec.java       |  14 +-
 .../planner/physical/TestBSTIndexExec.java      |  19 +-
 .../planner/physical/TestExternalSortExec.java  |   5 +-
 .../planner/physical/TestHashJoinExec.java      |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |  43 +-
 .../engine/planner/physical/TestSortExec.java   |   4 +-
 .../tajo/master/TestExecutionBlockCursor.java   |  21 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |  16 +-
 22 files changed, 903 insertions(+), 597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d48c15c..f41a886 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,12 +15,16 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-121: Add LogicalPlanVisitor and Refactor LogicalOptimizer to use the
+    visitor. (hyunsik)
+
     TAJO-118: Refactor and Improve text file Scanner. (jinho)
 
     TAJO-95: Eliminate the lazy copy approach from the classes wrapping
     protobuf-generated classes. (hyunsik)
 
-    TAJO-102: Add AlgebraVisitor and Refactor LogicalPlanner to use the visitor.    (hyunsik)
+    TAJO-102: Add AlgebraVisitor and Refactor LogicalPlanner to use the visitor.
+    (hyunsik)
 
     TAJO-87: Integration of tajo algebra module and SQL parser. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
new file mode 100644
index 0000000..a5e9530
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.Stack;
+
+public class BasicLogicalPlanVisitor<T> implements LogicalPlanVisitor<T> {
+
+  /**
+   * The prehook is called before each node is visited.
+   */
+  @SuppressWarnings("unused")
+  public void preHook(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, T data) throws PlanningException {
+  }
+
+  /**
+   * The posthook is called after each node is visited.
+   */
+  @SuppressWarnings("unused")
+  public void postHook(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+  }
+
+  /**
+   * visitChild visits each logicalNode recursively.
+   */
+  public LogicalNode visitChild(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    LogicalNode current;
+    switch (node.getType()) {
+      case ROOT:
+        current = visitRoot(plan, (LogicalRootNode) node, stack, data);
+        break;
+      case PROJECTION:
+        current = visitProjection(plan, (ProjectionNode) node, stack, data);
+        break;
+      case LIMIT:
+        current = visitLimit(plan, (LimitNode) node, stack, data);
+        break;
+      case SORT:
+        current = visitSort(plan, (SortNode) node, stack, data);
+        break;
+      case GROUP_BY:
+        current = visitGroupBy(plan, (GroupbyNode) node, stack, data);
+        break;
+      case SELECTION:
+        current = visitFilter(plan, (SelectionNode) node, stack, data);
+        break;
+      case JOIN:
+        current = visitJoin(plan, (JoinNode) node, stack, data);
+        break;
+      case UNION:
+        current = visitUnion(plan, (UnionNode) node, stack, data);
+        break;
+      case EXCEPT:
+        current = visitExcept(plan, (ExceptNode) node, stack, data);
+        break;
+      case INTERSECT:
+        current = visitIntersect(plan, (IntersectNode) node, stack, data);
+        break;
+      case SCAN:
+        current = visitScan(plan, (ScanNode) node, stack, data);
+        break;
+      case STORE:
+        current = visitStoreTable(plan, (StoreTableNode) node, stack, data);
+        break;
+      default:
+        current = node; // nothing to do for others
+    }
+
+    return current;
+  }
+
+  @Override
+  public LogicalNode visitRoot(LogicalPlan plan, LogicalRootNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(LogicalPlan plan, GroupbyNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitFilter(LogicalPlan plan, SelectionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitJoin(LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getOuterNode(), stack, data);
+    visitChild(plan, node.getInnerNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getOuterNode(), stack, data);
+    visitChild(plan, node.getInnerNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitExcept(LogicalPlan plan, ExceptNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getOuterNode(), stack, data);
+    visitChild(plan, node.getInnerNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitIntersect(LogicalPlan plan, IntersectNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getOuterNode(), stack, data);
+    visitChild(plan, node.getInnerNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitStoreTable(LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException {
+    stack.push(node);
+    visitChild(plan, node.getSubNode(), stack, data);
+    stack.pop();
+    return node;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index ad245c0..dcd4fca 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -18,533 +18,27 @@
 
 package org.apache.tajo.engine.planner;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.query.exception.InvalidQueryException;
-import org.apache.tajo.engine.utils.SchemaUtil;
-
-import java.util.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
+import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 
 /**
  * This class optimizes a logical plan.
  */
 public class LogicalOptimizer {
-  private static Log LOG = LogFactory.getLog(LogicalOptimizer.class);
-
-  private LogicalOptimizer() {
-  }
-
-  public static class OptimizationContext {
-    TargetListManager targetListManager;
-
-    public OptimizationContext(LogicalPlan plan, String blockName) {
-      this.targetListManager = new TargetListManager(plan, blockName);
-    }
-
-    public OptimizationContext(LogicalPlan plan, Target [] targets) {
-      this.targetListManager = new TargetListManager(plan, targets);
-    }
-
-    public TargetListManager getTargetListManager() {
-      return this.targetListManager;
-    }
-  }
-
-  public static LogicalNode optimize(LogicalPlan plan) throws PlanningException {
-    LogicalNode toBeOptimized;
-
-    toBeOptimized = plan.getRootBlock().getRoot();
-
-    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
-      LOG.info("This query skips the logical optimization step.");
-    } else {
-      if(PlannerUtil.findTopNode(toBeOptimized, ExprType.SELECTION) != null) {
-            pushSelection(toBeOptimized);
-      }
-      pushProjection(plan);
-    }
-    return toBeOptimized;
-  }
-
-  /**
-   * This method pushes down the selection into the appropriate sub 
-   * logical operators.
-   * <br />
-   * 
-   * There are three operators that can have search conditions.
-   * Selection, Join, and GroupBy clause.
-   * However, the search conditions of Join and GroupBy cannot be pushed down 
-   * into child operators because they can be used when the data layout change
-   * caused by join and grouping relations.
-   * <br />
-   * 
-   * However, some of the search conditions of selection clause can be pushed 
-   * down into appropriate sub operators. Some comparison expressions on 
-   * multiple relations are actually join conditions, and other expression 
-   * on single relation can be used in a scan operator or an Index Scan 
-   * operator.   
-   *
-   * @param plan
-   */
-  private static void pushSelection(LogicalNode plan) {
-    SelectionNode selNode = (SelectionNode) PlannerUtil.findTopNode(plan,
-        ExprType.SELECTION);
-    Preconditions.checkNotNull(selNode);
-    
-    Stack<LogicalNode> stack = new Stack<LogicalNode>();
-    EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(selNode.getQual());
-    pushSelectionRecursive(plan, Lists.newArrayList(cnf), stack);
-  }
-
-  private static void pushSelectionRecursive(LogicalNode plan,
-                                             List<EvalNode> cnf, Stack<LogicalNode> stack) {
-    
-    switch(plan.getType()) {
-
-      case ROOT:
-        LogicalRootNode rootNode = (LogicalRootNode) plan;
-        pushSelectionRecursive(rootNode.getSubNode(), cnf, stack);
-        break;
-
-    case SELECTION:
-      SelectionNode selNode = (SelectionNode) plan;
-      stack.push(selNode);
-      pushSelectionRecursive(selNode.getSubNode(),cnf, stack);
-      stack.pop();
-      
-      // remove the selection operator if there is no search condition 
-      // after selection push.
-      if(cnf.size() == 0) {
-        LogicalNode node = stack.peek();
-        if (node instanceof UnaryNode) {
-          UnaryNode unary = (UnaryNode) node;
-          unary.setSubNode(selNode.getSubNode());
-        } else {
-          throw new InvalidQueryException("Unexpected Logical Query Plan");
-        }
-      }
-      break;
-    case JOIN:
-      JoinNode join = (JoinNode) plan;
-
-      LogicalNode outer = join.getOuterNode();
-      LogicalNode inner = join.getInnerNode();
-
-      pushSelectionRecursive(outer, cnf, stack);
-      pushSelectionRecursive(inner, cnf, stack);
-
-      List<EvalNode> matched = Lists.newArrayList();
-      for (EvalNode eval : cnf) {
-        if (PlannerUtil.canBeEvaluated(eval, plan)) {
-          matched.add(eval);
-        }
-      }
-
-      EvalNode qual = null;
-      if (matched.size() > 1) {
-        // merged into one eval tree
-        qual = EvalTreeUtil.transformCNF2Singleton(
-            matched.toArray(new EvalNode [matched.size()]));
-      } else if (matched.size() == 1) {
-        // if the number of matched expr is one
-        qual = matched.get(0);
-      }
-
-      if (qual != null) {
-        JoinNode joinNode = (JoinNode) plan;
-        if (joinNode.hasJoinQual()) {
-          EvalNode conjQual = EvalTreeUtil.
-              transformCNF2Singleton(joinNode.getJoinQual(), qual);
-          joinNode.setJoinQual(conjQual);
-        } else {
-          joinNode.setJoinQual(qual);
-        }
-        if (joinNode.getJoinType() == JoinType.CROSS_JOIN) {
-          joinNode.setJoinType(JoinType.INNER);
-        }
-        cnf.removeAll(matched);
-      }
-
-      break;
-
-    case SCAN:
-      matched = Lists.newArrayList();
-      for (EvalNode eval : cnf) {
-        if (PlannerUtil.canBeEvaluated(eval, plan)) {
-          matched.add(eval);
-        }
-      }
+  private BasicQueryRewriteEngine rewriteEngine;
 
-      qual = null;
-      if (matched.size() > 1) {
-        // merged into one eval tree
-        qual = EvalTreeUtil.transformCNF2Singleton(
-            matched.toArray(new EvalNode [matched.size()]));
-      } else if (matched.size() == 1) {
-        // if the number of matched expr is one
-        qual = matched.get(0);
-      }
+  public LogicalOptimizer() {
+    rewriteEngine = new BasicQueryRewriteEngine();
 
-      if (qual != null) { // if a matched qual exists
-        ScanNode scanNode = (ScanNode) plan;
-        scanNode.setQual(qual);
-      }
-
-      cnf.removeAll(matched);
-      break;
-
-    default:
-      stack.push(plan);
-      if (plan instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) plan;
-        pushSelectionRecursive(unary.getSubNode(), cnf, stack);
-      } else if (plan instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) plan;
-        pushSelectionRecursive(binary.getOuterNode(), cnf, stack);
-        pushSelectionRecursive(binary.getInnerNode(), cnf, stack);
-      }
-      stack.pop();
-      break;
-    }
-  }
-
-  /**
-   * This method pushes down the projection list into the appropriate and
-   * below logical operators.
-   * @param plan
-   */
-  private static void pushProjection(LogicalPlan plan)
-      throws PlanningException {
-    Stack<LogicalNode> stack = new Stack<LogicalNode>();
-
-    OptimizationContext optCtx;
-
-    if (plan.getRootBlock().getProjection() != null &&
-        plan.getRootBlock().getProjection().isAllProjected()) {
-      optCtx = new OptimizationContext(plan,
-          plan.getRootBlock().getProjectionNode().getTargets());
-    } else {
-      optCtx = new OptimizationContext(plan, LogicalPlan.ROOT_BLOCK);
-    }
-
-    Collection<Column> finalSchema = plan.getRootBlock().getSchema().getColumns();
-
-    pushProjectionRecursive(plan, optCtx, plan.getRootBlock().getRoot(), stack,
-        new HashSet<Column>(finalSchema));
-
-  }
-
-  /**
-   * This method visits all operators recursively and shrink output columns according to only necessary columns
-   * at each operator.
-   *
-   * This method has three steps:
-   * <ol>
-   *   <li></li> collect column references necessary for each operator. For example, sort requires sortkeys,
-   *   join requires join conditions, selection requires filter conditions, and groupby requires grouping keys and having
-   *
-   * 2) shrink the output schema of each operator so that the operator reduces the output columns according to
-   * the necessary columns of their parent operators
-   * 3) shrink the input schema of each operator according to the shrunk output schemas of the child operators.
-   */
-  private static LogicalNode pushProjectionRecursive(
-      final LogicalPlan plan, final OptimizationContext optContext,
-      final LogicalNode node, final Stack<LogicalNode> stack,
-      final Set<Column> upperRequired) throws PlanningException {
-
-    LogicalNode currentNode = null;
-
-    switch (node.getType()) {
-      // They need only simple work
-      case ROOT:
-      case STORE:
-      case LIMIT:
-        currentNode = pushDownCommonPost(plan, optContext, (UnaryNode) node, upperRequired, stack);
-        break;
-
-      // They need special preworks.
-      case SELECTION:
-        currentNode = pushDownSelection(plan, optContext, (SelectionNode) node, upperRequired, stack);
-        break;
-      case SORT:
-        currentNode = pushDownSort(plan, optContext, (SortNode) node, upperRequired, stack);
-        break;
-      case UNION:
-      case EXCEPT:
-      case INTERSECT:
-        currentNode = pushDownSetNode(plan, optContext, (UnionNode) node, upperRequired, stack);
-        break;
-
-      // Projection, GroupBy, Join, and Scan are all projectable.
-      // A projectable operator can shrink or expand output columns through alias name and expressions.
-      case PROJECTION:
-        currentNode = pushDownProjection(plan, optContext, (ProjectionNode) node, upperRequired, stack);
-        break;
-      case GROUP_BY:
-        currentNode = pushDownGroupBy(plan, optContext, (GroupbyNode) node, upperRequired, stack);
-        break;
-      case JOIN:
-        currentNode = pushDownJoin(plan, optContext, (JoinNode) node, upperRequired, stack);
-        break;
-      case SCAN:
-        currentNode = pushdownScanNode(optContext, (ScanNode) node, upperRequired, stack);
-        break;
-
-      default:
-    }
-
-    return currentNode;
-  }
-
-  private static LogicalNode pushDownCommonPost(LogicalPlan plan, OptimizationContext context, UnaryNode node,
-                                                Set<Column> upperRequired, Stack<LogicalNode> stack) throws PlanningException {
-    stack.push(node);
-    LogicalNode child = pushProjectionRecursive(plan, context,
-        node.getSubNode(), stack, upperRequired);
-    stack.pop();
-    node.setInSchema(child.getOutSchema());
-    node.setOutSchema(child.getOutSchema());
-
-    if (node instanceof Projectable) {
-      pushDownProjectablePost(context, node, upperRequired, isTopmostProjectable(stack));
-    }
-    return node;
+    rewriteEngine.addRewriteRule(new FilterPushDownRule());
+    rewriteEngine.addRewriteRule(new ProjectionPushDownRule());
   }
 
-  private static LogicalNode pushDownProjection(LogicalPlan plan, OptimizationContext context,
-                                                   ProjectionNode projNode, Set<Column> upperRequired,
-                                                   Stack<LogicalNode> path) throws PlanningException {
-
-    for (Target target : projNode.getTargets()) {
-      upperRequired.add(target.getColumnSchema());
-    }
-
-    path.push(projNode);
-    LogicalNode child = pushProjectionRecursive(plan, context, projNode.getSubNode(), path, upperRequired);
-    path.pop();
-
-    LogicalNode childNode = projNode.getSubNode();
-
-    // If all expressions are evaluated in the child operators and the last operator is projectable,
-    // ProjectionNode will not be necessary. It eliminates ProjectionNode.
-    if (context.getTargetListManager().isAllEvaluated() && (childNode instanceof Projectable)) {
-      LogicalNode parent = path.peek();
-      // update the child node's output schemas
-      child.setOutSchema(context.getTargetListManager().getUpdatedSchema());
-      PlannerUtil.deleteNode(parent, projNode);
-      return child;
-    } else {
-      projNode.setInSchema(child.getOutSchema());
-      projNode.setTargets(context.getTargetListManager().getUpdatedTarget());
-      return projNode;
-    }
-  }
-
-  private static LogicalNode pushDownSelection(LogicalPlan plan, OptimizationContext context,
-                                                 SelectionNode selectionNode, Set<Column> upperRequired,
-                                                 Stack<LogicalNode> path) throws PlanningException {
-    if (selectionNode.getQual() != null) {
-      upperRequired.addAll(EvalTreeUtil.findDistinctRefColumns(selectionNode.getQual()));
-    }
-
-    return pushDownCommonPost(plan, context, selectionNode, upperRequired, path);
-  }
-
-  private static GroupbyNode pushDownGroupBy(LogicalPlan plan, OptimizationContext context, GroupbyNode groupbyNode,
-                                             Set<Column> upperRequired, Stack<LogicalNode> stack)
-      throws PlanningException {
-
-    Set<Column> currentRequired = new HashSet<Column>(upperRequired);
-
-    if (groupbyNode.hasHavingCondition()) {
-      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(groupbyNode.getHavingCondition()));
-    }
-
-    for (Target target : groupbyNode.getTargets()) {
-      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
-    }
-
-    pushDownCommonPost(plan, context, groupbyNode, currentRequired, stack);
-    return groupbyNode;
-  }
-
-  private static SortNode pushDownSort(LogicalPlan plan, OptimizationContext context, SortNode sortNode,
-                                       Set<Column> upperRequired, Stack<LogicalNode> stack) throws PlanningException {
-
-    for (SortSpec spec : sortNode.getSortKeys()) {
-      upperRequired.add(spec.getSortKey());
-    }
-
-    pushDownCommonPost(plan, context, sortNode, upperRequired, stack);
-
-    return sortNode;
-  }
-
-  private static JoinNode pushDownJoin(LogicalPlan plan, OptimizationContext context, JoinNode joinNode,
-                                       Set<Column> upperRequired, Stack<LogicalNode> path)
-      throws PlanningException {
-    Set<Column> currentRequired = Sets.newHashSet(upperRequired);
-
-    if (joinNode.hasTargets()) {
-      EvalNode expr;
-      for (Target target : joinNode.getTargets()) {
-        expr = target.getEvalTree();
-        if (expr.getType() != EvalNode.Type.FIELD) {
-          currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
-        }
-      }
-    }
-
-    if (joinNode.hasJoinQual()) {
-      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual()));
-    }
-
-    path.push(joinNode);
-    LogicalNode outer = pushProjectionRecursive(plan, context,
-        joinNode.getOuterNode(), path, currentRequired);
-    LogicalNode inner = pushProjectionRecursive(plan, context,
-        joinNode.getInnerNode(), path, currentRequired);
-    path.pop();
-
-    Schema merged = SchemaUtil.merge(outer.getOutSchema(), inner.getOutSchema());
-    joinNode.setInSchema(merged);
-    pushDownProjectablePost(context, joinNode, upperRequired, isTopmostProjectable(path));
-
-    return joinNode;
-  }
-
-  private static BinaryNode pushDownSetNode(LogicalPlan plan, OptimizationContext context, UnionNode node,
-                                            Set<Column> upperRequired, Stack<LogicalNode> stack) throws PlanningException {
-    BinaryNode setNode = node;
-
-    LogicalPlan.QueryBlock leftBlock = plan.getBlock(setNode.getOuterNode());
-    OptimizationContext leftCtx = new OptimizationContext(plan,
-        leftBlock.getTargetListManager().getUnEvaluatedTargets());
-    LogicalPlan.QueryBlock rightBlock = plan.getBlock(setNode.getInnerNode());
-    OptimizationContext rightCtx = new OptimizationContext(plan,
-        rightBlock.getTargetListManager().getUnEvaluatedTargets());
-
-    stack.push(setNode);
-    pushProjectionRecursive(plan, leftCtx, setNode.getOuterNode(), stack, upperRequired);
-    pushProjectionRecursive(plan, rightCtx, setNode.getInnerNode(), stack, upperRequired);
-    stack.pop();
-
-    // if this is the final union, we assume that all targets are evalauted.
-    // TODO - is it always correct?
-    if (stack.peek().getType() != ExprType.UNION) {
-      context.getTargetListManager().setEvaluatedAll();
-    }
-
-    return setNode;
-  }
-
-  private static ScanNode pushdownScanNode(OptimizationContext optContext, ScanNode scanNode,
-                                           Set<Column> upperRequired, Stack<LogicalNode> stack)
-      throws PlanningException {
-    return (ScanNode) pushDownProjectablePost(optContext, scanNode, upperRequired, isTopmostProjectable(stack));
-  }
-
-  private static boolean isTopmostProjectable(Stack<LogicalNode> stack) {
-    for (LogicalNode node : stack) {
-      if (node.getType() == ExprType.JOIN || node.getType() == ExprType.GROUP_BY) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private static LogicalNode pushDownProjectablePost(OptimizationContext context, LogicalNode node,
-                                                     Set<Column> upperRequired, boolean last)
-      throws PlanningException {
-    TargetListManager targetListManager = context.getTargetListManager();
-    EvalNode expr;
-
-    List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
-
-    for (int i = 0; i < targetListManager.size(); i++) {
-      expr = targetListManager.getTarget(i).getEvalTree();
-
-      if (!targetListManager.isEvaluated(i) && PlannerUtil.canBeEvaluated(expr, node)) {
-
-        if (node instanceof ScanNode) { // For ScanNode
-
-          if (expr.getType() == EvalNode.Type.FIELD && !targetListManager.getTarget(i).hasAlias()) {
-            targetListManager.setEvaluated(i);
-          } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
-            targetListManager.setEvaluated(i);
-            newEvaluatedTargetIds.add(i);
-          }
-
-        } else if (node instanceof GroupbyNode) { // For GroupBy
-          if (EvalTreeUtil.findDistinctAggFunction(expr).size() > 0 && expr.getType() != EvalNode.Type.FIELD) {
-            targetListManager.setEvaluated(i);
-            newEvaluatedTargetIds.add(i);
-          }
-
-        } else if (node instanceof JoinNode) {
-          if (expr.getType() != EvalNode.Type.FIELD && EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
-            targetListManager.setEvaluated(i);
-            newEvaluatedTargetIds.add(i);
-          }
-        }
-      }
-    }
-
-    Projectable projectable = (Projectable) node;
-    if (last) {
-      Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated.");
-      projectable.setTargets(targetListManager.getTargets());
-      targetListManager.getUpdatedTarget();
-      node.setOutSchema(targetListManager.getUpdatedSchema());
-    } else {
-    // Preparing targets regardless of that the node has targets or not.
-    // This part is required because some node does not have any targets,
-    // if the node has the same input and output schemas.
-
-    Target [] checkingTargets;
-    if (!projectable.hasTargets()) {
-      Schema outSchema = node.getOutSchema();
-      checkingTargets = new Target[outSchema.getColumnNum() + newEvaluatedTargetIds.size()];
-      PlannerUtil.schemaToTargets(outSchema, checkingTargets);
-      int baseIdx = outSchema.getColumnNum();
-      for (int i = 0; i < newEvaluatedTargetIds.size(); i++) {
-        checkingTargets[baseIdx + i] = targetListManager.getTarget(newEvaluatedTargetIds.get(i));
-      }
-    } else {
-      checkingTargets = projectable.getTargets();
-    }
-
-    List<Target> projectedTargets = new ArrayList<Target>();
-    for (Column column : upperRequired) {
-      for (Target target : checkingTargets) {
-
-        if (target.hasAlias() && target.getAlias().equalsIgnoreCase(column.getQualifiedName())) {
-            projectedTargets.add(target);
-        } else {
-
-          if (target.getColumnSchema().equals(column)) {
-            projectedTargets.add(target);
-          }
-        }
-      }
-    }
-
-    projectable.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
-    targetListManager.getUpdatedTarget();
-    node.setOutSchema(PlannerUtil.targetToSchema(projectable.getTargets()));
-    }
+  public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
+    rewriteEngine.rewrite(plan);
 
-    return node;
+    return plan.getRootBlock().getRoot();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
new file mode 100644
index 0000000..339824f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.Stack;
+
+public interface LogicalPlanVisitor <T> {
+  LogicalNode visitRoot(LogicalPlan plan, LogicalRootNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitGroupBy(LogicalPlan plan, GroupbyNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitFilter(LogicalPlan plan, SelectionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitJoin(LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitExcept(LogicalPlan plan, ExceptNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitIntersect(LogicalPlan plan, IntersectNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+  LogicalNode visitStoreTable(LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack, T data)
+      throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index dab58b9..b8cfb70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -330,7 +330,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // Strip the table names from the targets of the both blocks
     // in order to check the equivalence the schemas of both blocks.
-    Target[] leftStrippedTargets = PlannerUtil.stripTarget(leftContext.block.getCurrentTargets());
+    Target [] leftStrippedTargets = PlannerUtil.stripTarget(leftContext.block.getCurrentTargets());
 
     Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
     setOp.setInSchema(left.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
index d30c047..5f20f3e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
@@ -100,8 +100,9 @@ public class TargetListManager {
     return evaluatedFlags[id];
   }
 
-  public Target[] getUpdatedTarget() throws PlanningException {
-    Target[] updated = new Target[targets.length];
+  public Target [] getUpdatedTarget() throws PlanningException {
+    Target [] updated = new Target[targets.length];
+
     for (int i = 0; i < targets.length; i++) {
       if (targets[i] == null) { // if it is not created
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
new file mode 100644
index 0000000..d5e427c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.exception.InvalidQueryException;
+
+import java.util.List;
+import java.util.Stack;
+
+public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>> implements RewriteRule {
+  private static final String NAME = "FilterPushDown";
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
+
+    return PlannerUtil.findTopNode(toBeOptimized, ExprType.SELECTION) != null;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalNode root = plan.getRootBlock().getRoot();
+    SelectionNode selNode = (SelectionNode) PlannerUtil.findTopNode(root, ExprType.SELECTION);
+    Preconditions.checkNotNull(selNode);
+
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+    EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(selNode.getQual());
+
+    visitChild(plan, root, stack, Lists.newArrayList(cnf));
+    return plan;
+  }
+
+  public LogicalNode visitFilter(LogicalPlan plan, SelectionNode selNode, Stack<LogicalNode> stack, List<EvalNode> cnf)
+      throws PlanningException {
+    stack.push(selNode);
+    visitChild(plan, selNode.getSubNode(), stack, cnf);
+    stack.pop();
+
+    // remove the selection operator if there is no search condition
+    // after selection push.
+    if(cnf.size() == 0) {
+      LogicalNode node = stack.peek();
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        unary.setSubNode(selNode.getSubNode());
+      } else {
+        throw new InvalidQueryException("Unexpected Logical Query Plan");
+      }
+    }
+
+    return selNode;
+  }
+
+  public LogicalNode visitJoin(LogicalPlan plan, JoinNode joinNode, Stack<LogicalNode> stack, List<EvalNode> cnf)
+      throws PlanningException {
+    LogicalNode outer = joinNode.getOuterNode();
+    LogicalNode inner = joinNode.getInnerNode();
+
+    visitChild(plan, outer, stack, cnf);
+    visitChild(plan, inner, stack, cnf);
+
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : cnf) {
+      if (PlannerUtil.canBeEvaluated(eval, joinNode)) {
+        matched.add(eval);
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = EvalTreeUtil.transformCNF2Singleton(
+          matched.toArray(new EvalNode[matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.get(0);
+    }
+
+    if (qual != null) {
+      if (joinNode.hasJoinQual()) {
+        EvalNode conjQual = EvalTreeUtil.
+            transformCNF2Singleton(joinNode.getJoinQual(), qual);
+        joinNode.setJoinQual(conjQual);
+      } else {
+        joinNode.setJoinQual(qual);
+      }
+      if (joinNode.getJoinType() == JoinType.CROSS_JOIN) {
+        joinNode.setJoinType(JoinType.INNER);
+      }
+      cnf.removeAll(matched);
+    }
+
+    return joinNode;
+  }
+
+  public LogicalNode visitScan(LogicalPlan plan, ScanNode scanNode, Stack<LogicalNode> stack, List<EvalNode> cnf)
+      throws PlanningException {
+
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : cnf) {
+      if (PlannerUtil.canBeEvaluated(eval, scanNode)) {
+        matched.add(eval);
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = EvalTreeUtil.transformCNF2Singleton(
+          matched.toArray(new EvalNode [matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.get(0);
+    }
+
+    if (qual != null) { // if a matched qual exists
+      scanNode.setQual(qual);
+    }
+
+    cnf.removeAll(matched);
+
+    return scanNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
new file mode 100644
index 0000000..46740b5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+import java.util.*;
+
+public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPushDownRule.PushDownContext>
+    implements RewriteRule {
+  /** Class Logger */
+  private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class);
+  private static final String name = "ProjectionPushDown";
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
+
+    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
+      LOG.info("This query skips the logical optimization step.");
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+
+    PushDownContext context = new PushDownContext();
+    context.plan = plan;
+
+    if (plan.getRootBlock().getProjection() != null &&
+        plan.getRootBlock().getProjection().isAllProjected()) {
+      context.targetListManager = new TargetListManager(plan,
+          plan.getRootBlock().getProjectionNode().getTargets());
+    } else {
+      context.targetListManager= new TargetListManager(plan, LogicalPlan.ROOT_BLOCK);
+    }
+
+    context.upperRequired = new HashSet<Column>(plan.getRootBlock().getSchema().getColumns());
+
+
+    visitChild(plan, plan.getRootBlock().getRoot(), stack, context);
+
+    return plan;
+  }
+
+  public static class PushDownContext {
+    LogicalPlan plan;
+    TargetListManager targetListManager;
+    Set<Column> upperRequired;
+
+    public PushDownContext() {
+    }
+
+    public PushDownContext(ProjectionPushDownRule.PushDownContext context) {
+      this.plan = context.plan;
+      this.targetListManager = context.targetListManager;
+      this.upperRequired = context.upperRequired;
+    }
+  }
+
+  @Override
+  public LogicalNode visitRoot(LogicalPlan plan, LogicalRootNode node, Stack<LogicalNode> stack,
+                                   PushDownContext context) throws PlanningException {
+    return pushDownCommonPost(context, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack,
+                                        PushDownContext context) throws PlanningException {
+    for (Target target : node.getTargets()) {
+      context.upperRequired.add(target.getColumnSchema());
+    }
+
+    stack.push(node);
+    LogicalNode child = visitChild(plan, node.getSubNode(), stack, context);
+    stack.pop();
+
+    LogicalNode childNode = node.getSubNode();
+
+    // If all expressions are evaluated in the child operators and the last operator is projectable,
+    // ProjectionNode will not be necessary. It eliminates ProjectionNode.
+    if (context.targetListManager.isAllEvaluated() && (childNode instanceof Projectable)) {
+      LogicalNode parent = stack.peek();
+      // update the child node's output schemas
+      child.setOutSchema(context.targetListManager.getUpdatedSchema());
+      PlannerUtil.deleteNode(parent, node);
+      return child;
+    } else {
+      node.setInSchema(child.getOutSchema());
+      node.setTargets(context.targetListManager.getUpdatedTarget());
+      return node;
+    }
+  }
+
+  @Override
+  public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    return pushDownCommonPost(context, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    for (SortSpec spec : node.getSortKeys()) {
+      context.upperRequired.add(spec.getSortKey());
+    }
+
+    return pushDownCommonPost(context, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(LogicalPlan plan, GroupbyNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    Set<Column> currentRequired = new HashSet<Column>(context.upperRequired);
+
+    if (node.hasHavingCondition()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(node.getHavingCondition()));
+    }
+
+    for (Target target : node.getTargets()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
+    }
+
+    PushDownContext groupByContext = new PushDownContext(context);
+    groupByContext.upperRequired = currentRequired;
+    return pushDownCommonPost(groupByContext, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitFilter(LogicalPlan plan, SelectionNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    if (node.getQual() != null) {
+      context.upperRequired.addAll(EvalTreeUtil.findDistinctRefColumns(node.getQual()));
+    }
+
+    return pushDownCommonPost(context, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitJoin(LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    Set<Column> currentRequired = Sets.newHashSet(context.upperRequired);
+
+    if (node.hasTargets()) {
+      EvalNode expr;
+      for (Target target : node.getTargets()) {
+        expr = target.getEvalTree();
+        if (expr.getType() != EvalNode.Type.FIELD) {
+          currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
+        }
+      }
+    }
+
+    if (node.hasJoinQual()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(node.getJoinQual()));
+    }
+
+    PushDownContext leftContext = new PushDownContext(context);
+    PushDownContext rightContext = new PushDownContext(context);
+    leftContext.upperRequired = currentRequired;
+    rightContext.upperRequired = currentRequired;
+
+    stack.push(node);
+    LogicalNode outer = visitChild(plan, node.getOuterNode(), stack, leftContext);
+    LogicalNode inner = visitChild(plan, node.getInnerNode(), stack, rightContext);
+    stack.pop();
+
+    Schema merged = SchemaUtil.merge(outer.getOutSchema(), inner.getOutSchema());
+    node.setInSchema(merged);
+    pushDownProjectablePost(context, node, isTopmostProjectable(stack));
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    return pushDownSetNode(plan, node, stack, context);
+  }
+
+  @Override
+  public LogicalNode visitExcept(LogicalPlan plan, ExceptNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    return pushDownSetNode(plan, node, stack, context);
+  }
+
+  @Override
+  public LogicalNode visitIntersect(LogicalPlan plan, IntersectNode node, Stack<LogicalNode> stack,
+                                      PushDownContext context) throws PlanningException {
+    return pushDownSetNode(plan, node, stack, context);
+  }
+
+  @Override
+  public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, PushDownContext context)
+      throws PlanningException {
+    return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
+  }
+
+  @Override
+  public LogicalNode visitStoreTable(LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack,
+                                        PushDownContext context) throws PlanningException {
+    return pushDownCommonPost(context, node, stack);
+  }
+
+  private LogicalNode pushDownCommonPost(PushDownContext context, UnaryNode node, Stack<LogicalNode> stack)
+      throws PlanningException {
+
+    stack.push(node);
+    LogicalNode child = visitChild(context.plan, node.getSubNode(), stack, context);
+    stack.pop();
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+
+    if (node instanceof Projectable) {
+      pushDownProjectablePost(context, node, isTopmostProjectable(stack));
+    }
+    return node;
+  }
+
+  private static LogicalNode pushDownProjectablePost(PushDownContext context, LogicalNode node, boolean last)
+      throws PlanningException {
+    TargetListManager targetListManager = context.targetListManager;
+    EvalNode expr;
+
+    List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
+
+    for (int i = 0; i < targetListManager.size(); i++) {
+      expr = targetListManager.getTarget(i).getEvalTree();
+
+      if (!targetListManager.isEvaluated(i) && PlannerUtil.canBeEvaluated(expr, node)) {
+
+        if (node instanceof ScanNode) { // For ScanNode
+
+          if (expr.getType() == EvalNode.Type.FIELD && !targetListManager.getTarget(i).hasAlias()) {
+            targetListManager.setEvaluated(i);
+          } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
+
+        } else if (node instanceof GroupbyNode) { // For GroupBy
+          if (EvalTreeUtil.findDistinctAggFunction(expr).size() > 0 && expr.getType() != EvalNode.Type.FIELD) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
+
+        } else if (node instanceof JoinNode) {
+          if (expr.getType() != EvalNode.Type.FIELD && EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
+        }
+      }
+    }
+
+    Projectable projectable = (Projectable) node;
+    if (last) {
+      Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated.");
+      projectable.setTargets(targetListManager.getTargets());
+      targetListManager.getUpdatedTarget();
+      node.setOutSchema(targetListManager.getUpdatedSchema());
+    } else {
+      // Preparing targets regardless of that the node has targets or not.
+      // This part is required because some node does not have any targets,
+      // if the node has the same input and output schemas.
+
+      Target[] checkingTargets;
+      if (!projectable.hasTargets()) {
+        Schema outSchema = node.getOutSchema();
+        checkingTargets = new Target[outSchema.getColumnNum() + newEvaluatedTargetIds.size()];
+        PlannerUtil.schemaToTargets(outSchema, checkingTargets);
+        int baseIdx = outSchema.getColumnNum();
+        for (int i = 0; i < newEvaluatedTargetIds.size(); i++) {
+          checkingTargets[baseIdx + i] = targetListManager.getTarget(newEvaluatedTargetIds.get(i));
+        }
+      } else {
+        checkingTargets = projectable.getTargets();
+      }
+
+      List<Target> projectedTargets = new ArrayList<Target>();
+      for (Column column : context.upperRequired) {
+        for (Target target : checkingTargets) {
+
+          if (target.hasAlias() && target.getAlias().equalsIgnoreCase(column.getQualifiedName())) {
+            projectedTargets.add(target);
+          } else {
+
+            if (target.getColumnSchema().equals(column)) {
+              projectedTargets.add(target);
+            }
+          }
+        }
+      }
+
+      projectable.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+      targetListManager.getUpdatedTarget();
+      node.setOutSchema(PlannerUtil.targetToSchema(projectable.getTargets()));
+    }
+
+    return node;
+  }
+
+  private static boolean isTopmostProjectable(Stack<LogicalNode> stack) {
+    for (LogicalNode node : stack) {
+      if (node.getType() == ExprType.JOIN || node.getType() == ExprType.GROUP_BY) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private BinaryNode pushDownSetNode(LogicalPlan plan, BinaryNode setNode, Stack<LogicalNode> stack,
+                                            PushDownContext context) throws PlanningException {
+
+    LogicalPlan.QueryBlock leftBlock = plan.getBlock(setNode.getOuterNode());
+    PushDownContext leftContext = new PushDownContext(context);
+    leftContext.targetListManager = new TargetListManager(plan,
+        leftBlock.getTargetListManager().getUnEvaluatedTargets());
+
+    LogicalPlan.QueryBlock rightBlock = plan.getBlock(setNode.getInnerNode());
+    PushDownContext rightContext = new PushDownContext(context);
+    rightContext.targetListManager = new TargetListManager(plan,
+        rightBlock.getTargetListManager().getUnEvaluatedTargets());
+
+
+    stack.push(setNode);
+    visitChild(plan, setNode.getOuterNode(), stack, leftContext);
+    visitChild(plan, setNode.getInnerNode(), stack, rightContext);
+    stack.pop();
+
+    // if this is the final union, we assume that all targets are evalauted.
+    // TODO - is it always correct?
+    if (stack.peek().getType() != ExprType.UNION) {
+      context.targetListManager.setEvaluatedAll();
+    }
+
+    return setNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 6ea3c8b..2f4f12f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -50,11 +50,6 @@ import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.GlobalOptimizer;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.logical.CreateTableNode;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
@@ -76,6 +71,7 @@ public class GlobalEngine extends AbstractService {
   private SQLAnalyzer analyzer;
   private CatalogService catalog;
   private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
   private GlobalPlanner globalPlanner;
   private GlobalOptimizer globalOptimizer;
 
@@ -95,6 +91,7 @@ public class GlobalEngine extends AbstractService {
       connectYarnClient();
       analyzer = new SQLAnalyzer();
       planner = new LogicalPlanner(context.getCatalog());
+      optimizer = new LogicalOptimizer();
 
       globalPlanner = new GlobalPlanner(context.getConf(), context.getCatalog(),
           sm, context.getEventHandler());
@@ -228,7 +225,7 @@ public class GlobalEngine extends AbstractService {
     LogicalPlan plan = planner.createPlan(expression);
     LogicalNode optimizedPlan = null;
     try {
-      optimizedPlan = LogicalOptimizer.optimize(plan);
+      optimizedPlan = optimizer.optimize(plan);
     } catch (PlanningException e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 77f665f..b22e893 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -98,11 +98,14 @@ public class BackendTestingUtil {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+
   public BackendTestingUtil(TajoConf conf) throws IOException {
     this.conf = conf;
     this.catalog = new LocalCatalog(conf);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
   }
 
   public ResultSet run(String [] tableNames, File [] tables, Schema [] schemas, String query)
@@ -122,7 +125,7 @@ public class BackendTestingUtil {
         frags.toArray(new Fragment[frags.size()]), workDir);
     Expr EXPR = analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(EXPR);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index def2506..aa2f77c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -64,6 +64,7 @@ public class TestGlobalQueryPlanner {
   private static Schema schema;
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner logicalPlanner;
+  private static LogicalOptimizer optimizer;
   private static QueryId queryId;
   private static StorageManager sm;
 
@@ -103,6 +104,7 @@ public class TestGlobalQueryPlanner {
         dispatcher.getEventHandler());
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
 
     int tbNum = 2;
     int tupleNum;
@@ -150,7 +152,7 @@ public class TestGlobalQueryPlanner {
         "select age, sumtest(salary) from table0");
 
     LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
 
     MasterPlan globalPlan = planner.build(queryId,
@@ -171,7 +173,7 @@ public class TestGlobalQueryPlanner {
     Expr context = analyzer.parse(
         "create table store1 as select age, sumtest(salary) from table0 group by age");
     LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
 
@@ -209,7 +211,7 @@ public class TestGlobalQueryPlanner {
     Expr context = analyzer.parse(
         "create table store1 as select age from table0 order by age");
     LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
 
@@ -252,7 +254,7 @@ public class TestGlobalQueryPlanner {
         "select table0.age,table0.salary,table1.salary from table0,table1 " +
             "where table0.salary = table1.salary order by table0.age");
     LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
@@ -319,7 +321,7 @@ public class TestGlobalQueryPlanner {
     String query = "select table0.name, table1.salary from table0,table1 where table0.name = table1.name and table1.salary > 10";
     Expr context = analyzer.parse(query);
     LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
     
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
 
@@ -342,7 +344,7 @@ public class TestGlobalQueryPlanner {
     Expr expr = analyzer.parse(
         "select age, sum(salary) from table0 group by cube (age, id)");
     LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
index 344bbfe..175285e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalNode.java
@@ -23,8 +23,7 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import static org.junit.Assert.assertEquals;
 
 public class TestLogicalNode {
-  public static final void testCloneLogicalNode(LogicalNode n1) 
-      throws CloneNotSupportedException {
+  public static final void testCloneLogicalNode(LogicalNode n1) throws CloneNotSupportedException {
     LogicalNode copy = (LogicalNode) n1.clone();
     assertEquals(n1, copy);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index d55b9e4..ec1d288 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -41,6 +41,7 @@ public class TestLogicalOptimizer {
   private static CatalogService catalog;
   private static SQLAnalyzer sqlAnalyzer;
   private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -87,6 +88,7 @@ public class TestLogicalOptimizer {
     catalog.registerFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
   }
 
   @AfterClass
@@ -119,7 +121,7 @@ public class TestLogicalOptimizer {
     assertEquals(ExprType.SCAN, joinNode.getOuterNode().getType());
     assertEquals(ExprType.SCAN, joinNode.getInnerNode().getType());
     
-    LogicalNode optimized = LogicalOptimizer.optimize(newPlan);
+    LogicalNode optimized = optimizer.optimize(newPlan);
 
     assertEquals(ExprType.ROOT, optimized.getType());
     root = (LogicalRootNode) optimized;
@@ -135,7 +137,7 @@ public class TestLogicalOptimizer {
     // two relations
     Expr expr = sqlAnalyzer.parse(QUERIES[5]);
     LogicalPlan newPlan = planner.createPlan(expr);
-    LogicalOptimizer.optimize(newPlan);
+    optimizer.optimize(newPlan);
   }
   
   @Test
@@ -154,7 +156,7 @@ public class TestLogicalOptimizer {
     SelectionNode selNode = (SelectionNode) projNode.getSubNode();    
     assertEquals(ExprType.SCAN, selNode.getSubNode().getType());        
     
-    LogicalNode optimized = LogicalOptimizer.optimize(newPlan);
+    LogicalNode optimized = optimizer.optimize(newPlan);
     assertEquals(ExprType.ROOT, optimized.getType());
     root = (LogicalRootNode) optimized;
     TestLogicalNode.testCloneLogicalNode(root);
@@ -177,8 +179,9 @@ public class TestLogicalOptimizer {
     assertEquals(ExprType.SELECTION, groupbyNode.getSubNode().getType());
     SelectionNode selNode = (SelectionNode) groupbyNode.getSubNode();
     assertEquals(ExprType.SCAN, selNode.getSubNode().getType());
-    
-    LogicalNode optimized = LogicalOptimizer.optimize(newPlan);
+
+    System.out.println(newPlan.getRootBlock().getRoot());
+    LogicalNode optimized = optimizer.optimize(newPlan);
     assertEquals(ExprType.ROOT, optimized.getType());
     root = (LogicalRootNode) optimized;
     TestLogicalNode.testCloneLogicalNode(root);
@@ -212,7 +215,7 @@ public class TestLogicalOptimizer {
     assertTrue(PlannerUtil.canBeEvaluated(selNode.getQual(), joinNode));
     
     // Optimized plan
-    LogicalNode optimized = LogicalOptimizer.optimize(newPlan);
+    LogicalNode optimized = optimizer.optimize(newPlan);
     assertEquals(ExprType.ROOT, optimized.getType());
     root = (LogicalRootNode) optimized;
     

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index 1d2b0c1..c2e232b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -62,6 +62,7 @@ public class TestGlobalQueryOptimizer {
   private static Schema schema;
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner logicalPlanner;
+  private static LogicalOptimizer logicalOptimizer;
   private static QueryId queryId;
   private static GlobalOptimizer optimizer;
 
@@ -94,6 +95,7 @@ public class TestGlobalQueryOptimizer {
         dispatcher.getEventHandler());
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
+    logicalOptimizer = new LogicalOptimizer();
 
     int tbNum = 2;
     int tupleNum;
@@ -141,7 +143,7 @@ public class TestGlobalQueryOptimizer {
     Expr expr = analyzer.parse(
         "select table0.age,table0.salary,table1.salary from table0,table1 where table0.salary = table1.salary order by table0.age");
     LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = logicalOptimizer.optimize(plan);
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
     globalPlan = optimizer.optimize(globalPlan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index f144ad3..c5b1c05 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -19,25 +19,27 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 0160c11..1c651f6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -21,26 +21,29 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -55,6 +58,7 @@ public class TestBSTIndexExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
   private StorageManager sm;
   private Schema idxSchema;
   private TupleComparator comp;
@@ -136,6 +140,7 @@ public class TestBSTIndexExec {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
   }
 
   @After
@@ -155,7 +160,7 @@ public class TestBSTIndexExec {
         TUtil.newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir);
     Expr expr = analyzer.parse(QUERY);
     LogicalPlan plan = planner.createPlan(expr);
-    LogicalNode rootNode = LogicalOptimizer.optimize(plan);
+    LogicalNode rootNode = optimizer.optimize(plan);
 
     TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index ada850b..e7344d4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -29,7 +29,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cc2f5c8e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 0df67fc..591f431 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -29,7 +29,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;