You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/29 06:03:39 UTC

[10/13] TAJO-87: Integration of tajo algebra module and SQL parser

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
new file mode 100644
index 0000000..92d0336
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -0,0 +1,900 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.tree.TerminalNode;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.Aggregation.GroupType;
+import org.apache.tajo.algebra.LiteralValue.LiteralType;
+import org.apache.tajo.engine.parser.SQLParser.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.algebra.Aggregation.GroupElement;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.apache.tajo.engine.parser.SQLParser.*;
+
+public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
+  private SQLParser parser;
+
+  public SQLAnalyzer() {
+  }
+
+  public Expr parse(String sql) {
+    ANTLRInputStream input = new ANTLRInputStream(sql);
+    SQLLexer lexer = new SQLLexer(input);
+    CommonTokenStream tokens = new CommonTokenStream(lexer);
+    this.parser = new SQLParser(tokens);
+    parser.setBuildParseTree(true);
+    SqlContext context = parser.sql();
+    return visitSql(context);
+  }
+
+  @Override
+  public Expr visitSql(SqlContext ctx) {
+    return visit(ctx.statement());
+  }
+
+  @Override
+  public Expr visitNon_join_query_expression(SQLParser.Non_join_query_expressionContext ctx) {
+
+    Expr current = visitNon_join_query_term(ctx.non_join_query_term());
+    if (ctx.getChildCount() == 1) {
+      return current;
+    }
+
+    OpType operatorType;
+    Expr left;
+    for (int i = 1; i < ctx.getChildCount();) {
+      int idx = i;
+      boolean distinct = true;
+
+      if (ctx.getChild(idx) instanceof TerminalNode) {
+        if (((TerminalNode) ctx.getChild(idx)).getSymbol().getType() == UNION) {
+          operatorType = OpType.Union;
+        } else {
+          operatorType = OpType.Except;
+        }
+
+        idx++;
+
+        if (ctx.getChild(idx) instanceof TerminalNode) {
+          if (((TerminalNode) ctx.getChild(idx)).getSymbol().getType() == ALL) {
+            distinct = false;
+          }
+
+          idx++;
+        }
+
+        SQLParser.Query_termContext queryTermContext =
+            (SQLParser.Query_termContext) ctx.getChild(idx);
+        Expr right = visitQuery_term(queryTermContext);
+
+        left = current;
+        current = new SetOperation(operatorType, left, right, distinct);
+
+        i+=idx;
+      }
+    }
+
+    return current;
+  }
+
+  @Override
+  public Expr visitNon_join_query_term(Non_join_query_termContext ctx) {
+
+    Expr current = visitNon_join_query_primary(ctx.non_join_query_primary());
+    Expr left;
+
+    for (int i = 1; i < ctx.getChildCount();) {
+      int idx = i;
+      boolean distinct = true;
+
+      if (ctx.getChild(idx) instanceof TerminalNode) {
+        if (((TerminalNode) ctx.getChild(idx)).getSymbol().getType() == INTERSECT) {
+          idx++;
+        }
+
+        if (ctx.getChild(idx) instanceof TerminalNode) {
+          if (((TerminalNode) ctx.getChild(idx)).getSymbol().getType() == ALL) {
+            distinct = false;
+            idx++;
+          }
+        }
+
+        Query_primaryContext queryPrimaryContext = (Query_primaryContext) ctx.getChild(idx);
+        Expr right = visitQuery_primary(queryPrimaryContext);
+
+        left = current;
+        current = new SetOperation(OpType.Intersect, left, right, distinct);
+
+        i+=idx;
+      }
+    }
+
+    return current;
+
+  }
+
+  @Override
+  public Expr visitQuery_specification(SQLParser.Query_specificationContext ctx) {
+    Expr current = null;
+    if (ctx.table_expression() != null) {
+      current = visitFrom_clause(ctx.table_expression().from_clause());
+
+      if (ctx.table_expression().where_clause() != null) {
+        Selection selection = visitWhere_clause(ctx.table_expression().where_clause());
+        selection.setChild(current);
+        current = selection;
+      }
+
+      if (ctx.table_expression().groupby_clause() != null) {
+        Aggregation aggregation = visitGroupby_clause(ctx.table_expression().groupby_clause());
+        aggregation.setChild(current);
+        current = aggregation;
+
+        if (ctx.table_expression().having_clause() != null) {
+          Expr havingCondition = visitBoolean_value_expression(
+              ctx.table_expression().having_clause().boolean_value_expression());
+          aggregation.setHavingCondition(havingCondition);
+        }
+      }
+
+      if (ctx.table_expression().orderby_clause() != null) {
+        Sort sort = visitOrderby_clause(ctx.table_expression().orderby_clause());
+        sort.setChild(current);
+        current = sort;
+      }
+
+      if (ctx.table_expression().limit_clause() != null) {
+        Limit limit = visitLimit_clause(ctx.table_expression().limit_clause());
+        limit.setChild(current);
+        current = limit;
+      }
+    }
+
+    Projection projection = new Projection();
+
+    if (ctx.set_qualifier() != null && ctx.set_qualifier().DISTINCT() != null) {
+      projection.setDistinct();
+    }
+
+    if (ctx.select_list().MULTIPLY() != null) {
+      projection.setAll();
+    } else {
+      Target targets [] = new Target[ctx.select_list().derived_column().size()];
+      for (int i = 0; i < targets.length; i++) {
+        targets[i] = visitDerived_column(ctx.select_list().derived_column(i));
+      }
+      projection.setTargets(targets);
+    }
+
+    if (current != null) {
+      projection.setChild(current);
+    }
+
+    current = projection;
+
+    return current;
+  }
+
+  @Override
+  public RelationList visitFrom_clause(SQLParser.From_clauseContext ctx) {
+    Expr [] relations = new Expr[ctx.table_reference_list().table_reference().size()];
+    for (int i = 0; i < relations.length; i++) {
+      relations[i] = visit(ctx.table_reference_list().table_reference(i));
+    }
+    return new RelationList(relations);
+  }
+
+  @Override
+  public Selection visitWhere_clause(SQLParser.Where_clauseContext ctx) {
+    return new Selection(visitSearch_condition(ctx.search_condition()));
+  }
+
+  @Override
+  public Aggregation visitGroupby_clause(SQLParser.Groupby_clauseContext ctx) {
+    Aggregation clause = new Aggregation();
+
+    // If grouping group is not empty
+    if (ctx.grouping_element_list().grouping_element().get(0).empty_grouping_set() == null) {
+      GroupElement [] groups = new GroupElement[ctx.grouping_element_list().
+          grouping_element().size()];
+      for (int i = 0; i < groups.length; i++) {
+        SQLParser.Grouping_elementContext element =
+            ctx.grouping_element_list().grouping_element().get(i);
+        if (element.ordinary_grouping_set() != null) {
+          groups[i] = new GroupElement(GroupType.OrdinaryGroup,
+              getColumnReferences(element.ordinary_grouping_set().column_reference_list()));
+        } else if (element.rollup_list() != null) {
+          groups[i] = new GroupElement(GroupType.Rollup,
+              getColumnReferences(element.rollup_list().c.column_reference_list()));
+        } else if (element.cube_list() != null) {
+          groups[i] = new GroupElement(GroupType.Cube,
+              getColumnReferences(element.cube_list().c.column_reference_list()));
+        }
+      }
+      clause.setGroups(groups);
+    }
+
+    return clause;
+  }
+
+  @Override
+  public Sort visitOrderby_clause(SQLParser.Orderby_clauseContext ctx) {
+    int size = ctx.sort_specifier_list().sort_specifier().size();
+    Sort.SortSpec specs [] = new Sort.SortSpec[size];
+    for (int i = 0; i < size; i++) {
+      SQLParser.Sort_specifierContext specContext = ctx.sort_specifier_list().sort_specifier(i);
+      ColumnReferenceExpr column = visitColumn_reference(specContext.column);
+      specs[i] = new Sort.SortSpec(column);
+      if (specContext.order_specification() != null) {
+        if (specContext.order.DESC() != null) {
+          specs[i].setDescending();
+        }
+      }
+
+      if (specContext.null_ordering() != null) {
+        if (specContext.null_ordering().FIRST() != null) {
+          specs[i].setNullFirst();
+        }
+      }
+    }
+
+    return new Sort(specs);
+  }
+
+  @Override
+  public Limit visitLimit_clause(SQLParser.Limit_clauseContext ctx) {
+    return new Limit(visitNumeric_value_expression(ctx.numeric_value_expression()));
+  }
+
+  @Override
+  public Expr visitJoined_table(SQLParser.Joined_tableContext ctx) {
+    Expr top = visitTable_primary(ctx.table_primary());
+
+    // The following loop builds a left deep join tree.
+    Join join;
+    for (int i = 0; i < ctx.joined_table_primary().size(); i++) {
+      join = visitJoined_table_primary(ctx.joined_table_primary(i));
+      join.setLeft(top);
+      top = join;
+    }
+
+    return top;
+  }
+
+  @Override
+  public Join visitJoined_table_primary(SQLParser.Joined_table_primaryContext ctx) {
+    Join join;
+    if (ctx.CROSS() != null) {
+      join = new Join(JoinType.CROSS_JOIN);
+    } else if (ctx.UNION() != null) {
+      join = new Join(JoinType.UNION);
+    } else { // qualified join or natural
+      if (ctx.join_type() != null && ctx.join_type().outer_join_type() != null) {
+        Outer_join_type_part2Context outer_join_typeContext = ctx.join_type().outer_join_type()
+            .outer_join_type_part2();
+        if (outer_join_typeContext.FULL() != null) {
+          join = new Join(JoinType.FULL_OUTER);
+        } else if (outer_join_typeContext.LEFT() != null) {
+          join = new Join(JoinType.LEFT_OUTER);
+        } else {
+          join = new Join(JoinType.RIGHT_OUTER);
+        }
+      } else {
+        join = new Join(JoinType.INNER);
+      }
+
+      if (ctx.NATURAL() != null) {
+        join.setNatural();
+      }
+
+      if (ctx.join_specification() != null) { // only for qualified join
+        if (ctx.join_specification().join_condition() != null) {
+          Expr searchCondition = visitSearch_condition(ctx.join_specification().
+              join_condition().search_condition());
+          join.setQual(searchCondition);
+        } else if (ctx.join_specification().named_columns_join() != null) {
+          ColumnReferenceExpr [] columns = getColumnReferences(ctx.join_specification().
+              named_columns_join().column_reference_list());
+          join.setJoinColumns(columns);
+        }
+      }
+    }
+
+    join.setRight(visitTable_primary(ctx.right));
+    return join;
+  }
+
+  private ColumnReferenceExpr [] getColumnReferences(Column_reference_listContext ctx) {
+    ColumnReferenceExpr [] columnRefs = new ColumnReferenceExpr[ctx.column_reference().size()];
+    for (int i = 0; i < columnRefs.length; i++) {
+      columnRefs[i] = visitColumn_reference(ctx.column_reference(i));
+    }
+    return columnRefs;
+  }
+
+  @Override
+  public Expr visitTable_primary(SQLParser.Table_primaryContext ctx) {
+    if (ctx.table_or_query_name() != null) {
+      Relation relation = new Relation(ctx.table_or_query_name().getText());
+      if (ctx.AS() != null) {
+        relation.setAlias(ctx.alias.getText());
+      }
+      return relation;
+    } else if (ctx.derived_table() != null) {
+      return new TableSubQuery(ctx.name.getText(), visit(ctx.derived_table().table_subquery()));
+    } else {
+      return null;
+    }
+  }
+
+
+  @Override
+  public Expr visitSubquery(SQLParser.SubqueryContext ctx) {
+    return visitQuery_expression(ctx.query_expression());
+  }
+
+  @Override
+  public CaseWhenPredicate visitSimple_case(SQLParser.Simple_caseContext ctx) {
+    Expr leftTerm = visitBoolean_value_expression(ctx.boolean_value_expression());
+    CaseWhenPredicate caseWhen = new CaseWhenPredicate();
+
+    for (int i = 0; i < ctx.simple_when_clause().size(); i++) {
+      Simple_when_clauseContext simpleWhenCtx = ctx.simple_when_clause(i);
+      BinaryOperator bin = new BinaryOperator(OpType.Equals, leftTerm,
+          visitNumeric_value_expression(simpleWhenCtx.numeric_value_expression()));
+      caseWhen.addWhen(bin, buildCaseResult(simpleWhenCtx.result()));
+    }
+    if (ctx.else_clause() != null) {
+      caseWhen.setElseResult(buildCaseResult(ctx.else_clause().result()));
+    }
+    return caseWhen;
+  }
+
+  private Expr buildCaseResult(ResultContext result) {
+    if (result.NULL() != null) {
+      return new NullValue();
+    } else {
+      return visitNumeric_value_expression(result.numeric_value_expression());
+    }
+  }
+
+  @Override public CaseWhenPredicate visitSearched_case(SQLParser.Searched_caseContext ctx) {
+    CaseWhenPredicate caseWhen = new CaseWhenPredicate();
+
+    for (int i = 0; i < ctx.searched_when_clause().size(); i++) {
+      Searched_when_clauseContext searchedWhenCtx = ctx.searched_when_clause(i);
+      caseWhen.addWhen(
+          visitSearch_condition(searchedWhenCtx.search_condition()),
+          buildCaseResult(searchedWhenCtx.result()));
+    }
+    if (ctx.else_clause() != null) {
+      caseWhen.setElseResult(buildCaseResult(ctx.else_clause().result()));
+    }
+    return caseWhen;
+  }
+
+  @Override
+  public Expr visitAnd_predicate(SQLParser.And_predicateContext ctx) {
+    Expr current = visitBoolean_factor(ctx.boolean_factor(0));
+
+    Expr left;
+    Expr right;
+    for (int i = 1; i < ctx.boolean_factor().size(); i++) {
+      left = current;
+      right = visitBoolean_factor(ctx.boolean_factor(i));
+      current = new BinaryOperator(OpType.And, left, right);
+    }
+
+    return current;
+  }
+
+  @Override
+  public Expr visitBoolean_value_expression(SQLParser.Boolean_value_expressionContext ctx) {
+
+    Expr current = visitAnd_predicate(ctx.and_predicate(0));
+
+    Expr left;
+    Expr right;
+    for (int i = 1; i < ctx.and_predicate().size(); i++) {
+      left = current;
+      right = visitAnd_predicate(ctx.and_predicate(i));
+      current = new BinaryOperator(OpType.Or, left, right);
+    }
+
+    return current;
+  }
+
+  @Override
+  public Expr visitBoolean_factor(SQLParser.Boolean_factorContext ctx) {
+    if (ctx.NOT() != null) {
+      return new NotExpr(visitBoolean_test(ctx.boolean_test()));
+    } else {
+      return visitBoolean_test(ctx.boolean_test());
+    }
+  }
+
+  @Override
+  public Expr visitBoolean_test(SQLParser.Boolean_testContext ctx) {
+    return visitBoolean_primary(ctx.boolean_primary());
+  }
+
+  @Override
+  public Expr visitBoolean_primary(SQLParser.Boolean_primaryContext ctx) {
+    if (ctx.predicate() != null) {
+      return visitPredicate(ctx.predicate());
+    } else if (ctx.numeric_value_expression() != null) {
+      return visitNumeric_value_expression(ctx.numeric_value_expression());
+    } else if (ctx.case_expression() != null) {
+      return visitCase_expression(ctx.case_expression());
+    } else if (ctx.boolean_value_expression() != null) {
+      return visitBoolean_value_expression(ctx.boolean_value_expression());
+    } else {
+      return visitChildren(ctx);
+    }
+  }
+
+  @Override
+  public BinaryOperator visitComparison_predicate(SQLParser.Comparison_predicateContext ctx) {
+    TerminalNode operator = (TerminalNode) ctx.comp_op().getChild(0);
+    return new BinaryOperator(tokenToExprType(operator.getSymbol().getType()),
+        visitNumeric_value_expression(ctx.left),
+        visitNumeric_value_expression(ctx.right));
+  }
+
+  @Override
+  public Expr visitNumeric_value_expression(SQLParser.Numeric_value_expressionContext ctx) {
+    Expr current = visitTerm(ctx.term(0));
+
+    Expr left;
+    Expr right;
+    for (int i = 1; i < ctx.getChildCount(); i++) {
+      left = current;
+      TerminalNode operator = (TerminalNode) ctx.getChild(i++);
+      right = visitTerm((TermContext) ctx.getChild(i));
+
+      if (operator.getSymbol().getType() == PLUS) {
+        current = new BinaryOperator(OpType.Plus, left, right);
+      } else {
+        current = new BinaryOperator(OpType.Minus, left, right);
+      }
+    }
+
+    return current;
+  }
+
+  @Override
+  public Expr visitTerm(SQLParser.TermContext ctx) {
+    Expr current = visitNumeric_primary(ctx.numeric_primary(0));
+
+    Expr left;
+    Expr right;
+    for (int i = 1; i < ctx.getChildCount(); i++) {
+      left = current;
+      TerminalNode operator = (TerminalNode) ctx.getChild(i++);
+      right = visitNumeric_primary((Numeric_primaryContext) ctx.getChild(i));
+
+      if (operator.getSymbol().getType() == MULTIPLY) {
+        current = new BinaryOperator(OpType.Multiply, left, right);
+      } else if (operator.getSymbol().getType() == DIVIDE) {
+        current = new BinaryOperator(OpType.Divide, left, right);
+      } else {
+        current = new BinaryOperator(OpType.Modular, left, right);
+      }
+    }
+
+    return current;
+  }
+
+  @Override
+  public Expr visitNumeric_primary(SQLParser.Numeric_primaryContext ctx) {
+    if (ctx.numeric_value_expression() != null) {
+      return visitNumeric_value_expression(ctx.numeric_value_expression());
+    } else {
+      return visitChildren(ctx);
+    }
+  }
+
+  public static OpType tokenToExprType(int tokenId) {
+    switch (tokenId) {
+      case SQLParser.UNION: return OpType.Union;
+      case SQLParser.EXCEPT: return OpType.Except;
+      case SQLParser.INTERSECT: return OpType.Intersect;
+
+      case SQLParser.AND: return OpType.And;
+      case SQLParser.OR: return OpType.Or;
+
+      case SQLParser.EQUAL: return OpType.Equals;
+      case SQLParser.NOT_EQUAL: return OpType.NotEquals;
+      case SQLParser.LTH: return OpType.LessThan;
+      case SQLParser.LEQ: return OpType.LessThanOrEquals;
+      case SQLParser.GTH: return OpType.GreaterThan;
+      case SQLParser.GEQ: return OpType.GreaterThanOrEquals;
+
+      case SQLParser.MULTIPLY: return OpType.Multiply;
+      case SQLParser.DIVIDE: return OpType.Divide;
+      case SQLParser.MODULAR: return OpType.Modular;
+      case SQLParser.PLUS: return OpType.Plus;
+      case SQLParser.MINUS: return OpType.Minus;
+
+      default: throw new RuntimeException("Unknown Token Id: " + tokenId);
+    }
+  }
+
+  @Override
+  public InPredicate visitIn_predicate(SQLParser.In_predicateContext ctx) {
+    return new InPredicate(visitChildren(ctx.numeric_value_expression()),
+        visitChildren(ctx.in_predicate_value()));
+  }
+
+  @Override
+  public ValueListExpr visitIn_value_list(SQLParser.In_value_listContext ctx) {
+    int size = ctx.numeric_value_expression().size();
+    Expr [] exprs = new Expr[size];
+    for (int i = 0; i < size; i++) {
+      exprs[i] = visit(ctx.numeric_value_expression(i));
+    }
+    return new ValueListExpr(exprs);
+  }
+
+  @Override
+  public Expr visitArray(SQLParser.ArrayContext ctx) {
+    int size = ctx.numeric_value_expression().size();
+    Expr [] exprs = new Expr[size];
+    for (int i = 0; i < size; i++) {
+      exprs[i] = visit(ctx.numeric_value_expression(i));
+    }
+    return new ValueListExpr(exprs);
+  }
+
+  @Override
+  public LikePredicate visitLike_predicate(SQLParser.Like_predicateContext ctx) {
+    boolean not = ctx.NOT() != null;
+
+    ColumnReferenceExpr predicand = (ColumnReferenceExpr) visit(ctx.column_reference());
+    Expr pattern = new LiteralValue(stripQuote(ctx.Character_String_Literal().getText()),
+        LiteralType.String);
+    return new LikePredicate(not, predicand, pattern);
+  }
+
+  @Override
+  public IsNullPredicate visitNull_predicate(SQLParser.Null_predicateContext ctx) {
+    boolean not = ctx.NOT() != null;
+
+    ColumnReferenceExpr predicand = (ColumnReferenceExpr) visit(ctx.numeric_value_expression());
+    return new IsNullPredicate(not, predicand);
+  }
+
+  @Override
+  public ColumnReferenceExpr visitColumn_reference(SQLParser.Column_referenceContext ctx) {
+    ColumnReferenceExpr column = new ColumnReferenceExpr(ctx.name.getText());
+    if (ctx.tb_name != null) {
+      column.setTableName(ctx.tb_name.getText());
+    }
+
+    return column;
+  }
+
+  @Override
+  public LiteralValue visitUnsigned_numerical_literal(SQLParser.Unsigned_numerical_literalContext ctx) {
+    if (ctx.NUMBER() != null) {
+      return new LiteralValue(ctx.getText(), LiteralType.Unsigned_Integer);
+    } else {
+      return new LiteralValue(ctx.getText(), LiteralType.Unsigned_Float);
+    }
+  }
+
+  @Override
+  public FunctionExpr visitRoutine_invocation(SQLParser.Routine_invocationContext ctx) {
+    if (ctx.COUNT() != null && ctx.MULTIPLY() != null) {
+      return new CountRowsFunctionExpr();
+    }else if (ctx.COUNT() != null && ctx.sql_argument_list().boolean_value_expression().size() == 1) {
+      return new CountValueFunctionExpr(visitBoolean_value_expression(
+          ctx.sql_argument_list().boolean_value_expression().get(0)));
+    } else {
+      String signature = ctx.Identifier().getText();
+      FunctionExpr function = new FunctionExpr(signature);
+      if (ctx.sql_argument_list() != null) {
+        int numArgs = ctx.sql_argument_list().boolean_value_expression().size();
+        Expr [] argument_list = new Expr[numArgs];
+        for (int i = 0; i < numArgs; i++) {
+          argument_list[i] = visitBoolean_value_expression(ctx.sql_argument_list().
+              boolean_value_expression().get(i));
+        }
+
+        function.setParams(argument_list);
+      }
+      return function;
+    }
+  }
+
+  @Override
+  public Target visitDerived_column(SQLParser.Derived_columnContext ctx) {
+    Target target = new Target(visitBoolean_value_expression(ctx.boolean_value_expression()));
+    if (ctx.as_clause() != null) {
+      target.setAlias(ctx.as_clause().Identifier().getText());
+    }
+    return target;
+  }
+
+  @Override
+  public LiteralValue visitCharacter_string_type(SQLParser.Character_string_typeContext ctx) {
+    return new LiteralValue(stripQuote(ctx.getText()), LiteralType.String);
+  }
+
+  @Override
+  public LiteralValue visitString_value_expr(SQLParser.String_value_exprContext ctx) {
+    return new LiteralValue(stripQuote(ctx.getText()), LiteralType.String);
+  }
+
+  @Override
+  public Expr visitCreate_table_statement(SQLParser.Create_table_statementContext ctx) {
+    String tableName = ctx.table_name().getText();
+    CreateTable createTable = new CreateTable(tableName);
+
+    if (ctx.EXTERNAL() != null) {
+      createTable.setExternal();
+
+      CreateTable.ColumnDefinition [] elements = getDefinitions(ctx.table_elements());
+      String fileType = ctx.file_type.getText();
+      String path = stripQuote(ctx.path.getText());
+
+      createTable.setTableElements(elements);
+      createTable.setStorageType(fileType);
+      createTable.setLocation(path);
+    } else {
+      if (ctx.table_elements() != null) {
+        CreateTable.ColumnDefinition [] elements = getDefinitions(ctx.table_elements());
+        createTable.setTableElements(elements);
+      }
+
+      if (ctx.USING() != null) {
+        String fileType = ctx.file_type.getText();
+        createTable.setStorageType(fileType);
+      }
+
+      if (ctx.query_expression() != null) {
+        Expr subquery = visitQuery_expression(ctx.query_expression());
+        createTable.setSubQuery(subquery);
+      }
+    }
+
+    if (ctx.param_clause() != null) {
+      Map<String, String> params = getParams(ctx.param_clause());
+      createTable.setParams(params);
+    }
+    return createTable;
+  }
+
+  private CreateTable.ColumnDefinition [] getDefinitions(SQLParser.Table_elementsContext ctx) {
+    int size = ctx.field_element().size();
+    CreateTable.ColumnDefinition [] elements = new CreateTable.ColumnDefinition[size];
+    for (int i = 0; i < size; i++) {
+      String name = ctx.field_element(i).name.getText();
+      TypeDefinition typeDef = getDataType(ctx.field_element(i).field_type().data_type());
+      String type = typeDef.getType();
+      elements[i] = new CreateTable.ColumnDefinition(name, type);
+      if (typeDef.getLengthOrPrecision() != null) {
+        elements[i].setLengthOrPrecision(typeDef.getLengthOrPrecision());
+
+        if (typeDef.getScale() != null) {
+          elements[i].setScale(typeDef.getScale());
+        }
+      }
+
+    }
+
+    return elements;
+  }
+
+  private TypeDefinition getDataType(SQLParser.Data_typeContext type) {
+    SQLParser.Predefined_typeContext predefined_type = type.predefined_type();
+
+    TypeDefinition typeDefinition = null;
+    if (predefined_type.character_string_type() != null) {
+      SQLParser.Character_string_typeContext character_string_type =
+          predefined_type.character_string_type();
+
+      if ((character_string_type.CHARACTER() != null || character_string_type.CHAR() != null) &&
+          character_string_type.VARYING() == null) {
+
+        typeDefinition = new TypeDefinition(Type.CHAR.name());
+
+        if (character_string_type.type_length() != null) {
+          typeDefinition.setLengthOrPrecision(
+              Integer.parseInt(character_string_type.type_length().NUMBER().getText()));
+        }
+
+      } else if (character_string_type.VARCHAR() != null
+          || character_string_type.VARYING() != null) {
+
+        typeDefinition = new TypeDefinition(Type.VARCHAR.name());
+
+        if (character_string_type.type_length() != null) {
+          typeDefinition.setLengthOrPrecision(
+              Integer.parseInt(character_string_type.type_length().NUMBER().getText()));
+        }
+
+      } else if (character_string_type.TEXT() != null) {
+        typeDefinition =  new TypeDefinition(Type.TEXT.name());
+      }
+
+    } else if (predefined_type.national_character_string_type() != null) {
+      SQLParser.National_character_string_typeContext nchar_type =
+          predefined_type.national_character_string_type();
+      if ((nchar_type.CHAR() != null || nchar_type.CHARACTER() != null
+          || nchar_type.NCHAR() != null) && nchar_type.VARYING() == null) {
+        typeDefinition = new TypeDefinition(Type.NCHAR.name());
+      } else if (nchar_type.NVARCHAR() != null || nchar_type.VARYING() != null) {
+        typeDefinition = new TypeDefinition(Type.NVARCHAR.name());
+      }
+
+      if (nchar_type.type_length() != null) {
+        typeDefinition.setLengthOrPrecision(
+            Integer.parseInt(nchar_type.type_length().NUMBER().getText()));
+      }
+
+    } else if (predefined_type.binary_large_object_string_type() != null) {
+      SQLParser.Binary_large_object_string_typeContext blob_type =
+          predefined_type.binary_large_object_string_type();
+      typeDefinition = new TypeDefinition(Type.BLOB.name());
+      if (blob_type.type_length() != null) {
+        typeDefinition.setLengthOrPrecision(
+            Integer.parseInt(blob_type.type_length().NUMBER().getText()));
+      }
+    } else if (predefined_type.numeric_type() != null) {
+      // exact number
+      if (predefined_type.numeric_type().exact_numeric_type() != null) {
+        SQLParser.Exact_numeric_typeContext exactType =
+            predefined_type.numeric_type().exact_numeric_type();
+        if (exactType.TINYINT() != null || exactType.INT1() != null) {
+          typeDefinition = new TypeDefinition(Type.INT1.name());
+        } else if (exactType.INT2() != null || exactType.SMALLINT() != null) {
+          typeDefinition = new TypeDefinition(Type.INT2.name());
+        } else if (exactType.INT4() != null || exactType.INTEGER() != null ||
+            exactType.INT() != null) {
+          typeDefinition = new TypeDefinition(Type.INT4.name());
+        } else if (exactType.INT8() != null || exactType.BIGINT() != null) {
+          typeDefinition = new TypeDefinition(Type.INT8.name());
+        } else if (exactType.NUMERIC() != null) {
+          typeDefinition = new TypeDefinition(Type.NUMERIC.name());
+        } else if (exactType.DECIMAL() != null || exactType.DEC() != null) {
+          typeDefinition = new TypeDefinition(Type.DECIMAL.name());
+        }
+
+        if (typeDefinition.getType().equals(Type.NUMERIC.name()) ||
+            typeDefinition.getType().equals(Type.DECIMAL.name())) {
+          if (exactType.precision_param() != null) {
+            if (exactType.precision_param().scale != null) {
+              typeDefinition.setScale(
+                  Integer.parseInt(exactType.precision_param().scale.getText()));
+            }
+            typeDefinition.setLengthOrPrecision(
+                Integer.parseInt(exactType.precision_param().precision.getText()));
+          }
+        }
+      } else { // approximate number
+        SQLParser.Approximate_numeric_typeContext approximateType =
+            predefined_type.numeric_type().approximate_numeric_type();
+        if (approximateType.FLOAT() != null || approximateType.FLOAT4() != null
+            || approximateType.REAL() != null) {
+          typeDefinition = new TypeDefinition(Type.FLOAT4.name());
+        } else if (approximateType.FLOAT8() != null || approximateType.DOUBLE() != null) {
+          typeDefinition = new TypeDefinition(Type.FLOAT8.name());
+        }
+      }
+    } else if (predefined_type.boolean_type() != null)  {
+      typeDefinition = new TypeDefinition(Type.BOOLEAN.name());
+    } else if (predefined_type.datetime_type() != null) {
+      SQLParser.Datetime_typeContext dateTimeType = predefined_type.datetime_type();
+      if (dateTimeType.DATE() != null) {
+        typeDefinition = new TypeDefinition(Type.DATE.name());
+      } else if (dateTimeType.TIME(0) != null && dateTimeType.ZONE() == null) {
+        typeDefinition = new TypeDefinition(Type.TIME.name());
+      } else if ((dateTimeType.TIME(0) != null && dateTimeType.ZONE() != null) ||
+          dateTimeType.TIMETZ() != null) {
+        typeDefinition = new TypeDefinition(Type.TIMEZ.name());
+      } else if (dateTimeType.TIMESTAMP() != null && dateTimeType.ZONE() == null) {
+        typeDefinition = new TypeDefinition(Type.TIMESTAMP.name());
+      } else if ((dateTimeType.TIMESTAMP() != null && dateTimeType.ZONE() != null) ||
+          dateTimeType.TIMESTAMPTZ() != null) {
+        typeDefinition = new TypeDefinition(Type.TIMESTAMPZ.name());
+      }
+    } else if (predefined_type.bit_type() != null) {
+      SQLParser.Bit_typeContext bitType = predefined_type.bit_type();
+      if (bitType.VARBIT() != null || bitType.VARYING() != null) {
+        typeDefinition = new TypeDefinition(Type.VARBIT.name());
+      } else {
+        typeDefinition = new TypeDefinition(Type.BIT.name());
+      }
+      if (bitType.type_length() != null) {
+        typeDefinition.setLengthOrPrecision(
+            Integer.parseInt(bitType.type_length().NUMBER().getText()));
+      }
+    } else if (predefined_type.binary_type() != null) {
+      SQLParser.Binary_typeContext binaryType = predefined_type.binary_type();
+      if (binaryType.VARBINARY() != null || binaryType.VARYING() != null) {
+        typeDefinition = new TypeDefinition(Type.VARBINARY.name());
+      } else {
+        typeDefinition = new TypeDefinition(Type.BINARY.name());
+      }
+
+      if (binaryType.type_length() != null) {
+        typeDefinition.setLengthOrPrecision(
+            Integer.parseInt(binaryType.type_length().NUMBER().getText()));
+      }
+    }
+
+    return typeDefinition;
+  }
+
+  public static class TypeDefinition {
+    private String type;
+    private Integer length_or_precision;
+    private Integer scale;
+
+    TypeDefinition(String type) {
+      this.type = type;
+    }
+
+    String getType() {
+      return type;
+    }
+
+    public void setLengthOrPrecision(Integer length_or_precision) {
+      this.length_or_precision = length_or_precision;
+    }
+
+    public Integer getLengthOrPrecision() {
+      return length_or_precision;
+    }
+
+    void setScale(Integer scale) {
+      this.scale = scale;
+    }
+
+    public Integer getScale() {
+      return this.scale;
+    }
+  }
+
+  @Override
+  public Expr visitDrop_table_statement(SQLParser.Drop_table_statementContext ctx) {
+    return new DropTable(ctx.table_name().getText());
+  }
+
+
+  private Map<String, String> getParams(SQLParser.Param_clauseContext ctx) {
+    Map<String, String> params = new HashMap<String, String>();
+    for (int i = 0; i < ctx.param().size(); i++) {
+      params.put(stripQuote(ctx.param(i).key.getText()), stripQuote(ctx.param(i).value.getText()));
+    }
+
+    return params;
+  }
+
+  private static String stripQuote(String str) {
+    return str.substring(1, str.length() - 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
deleted file mode 100644
index 126e4bd..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.parser;
-
-import org.apache.tajo.engine.planner.PlanningContext;
-
-import java.util.Map.Entry;
-
-public class SetStmt extends ParseTree {
-  private ParseTree leftTree;
-  private ParseTree rightTree;
-  private boolean distinct = true;
-
-  public SetStmt(final PlanningContext context,
-                 final StatementType type,
-                 final ParseTree leftTree,
-                 final ParseTree rightTree,
-                 boolean distinct) {
-    super(context, type);
-    this.leftTree = leftTree;
-    this.rightTree = rightTree;
-    this.distinct = distinct;
-
-    for (Entry<String, String> entry : leftTree.getAliasToNames()) {
-       addTableRef(entry.getValue(), entry.getKey());
-    }
-
-    for (Entry<String, String> entry : rightTree.getAliasToNames()) {
-      addTableRef(entry.getValue(), entry.getKey());
-    }
-  }
-  
-  public boolean isDistinct() {
-    return distinct;
-  }
-  
-  public ParseTree getLeftTree() {
-    return this.leftTree;
-  }
-  
-  public ParseTree getRightTree() {
-    return this.rightTree;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
deleted file mode 100644
index de9b7e6..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.parser;
-
-public enum StatementType {
-	// Store
-  STORE,
-  CREATE_TABLE_AS,
-  COPY,
-
-  // Select
-	SELECT,
-	UNION,
-	EXCEPT,
-	INTERSECT,
-	
-	// Update
-	INSERT,
-	UPDATE,
-	DELETE,
-	
-	// Schema	
-	CREATE_TABLE,
-	DROP_TABLE,
-	
-	// INDEX
-	CREATE_INDEX,
-	DROP_INDEX,
-
-	// Control
-	SHOW_TABLES,
-	DESC_TABLE,
-	SHOW_FUNCTION,
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
deleted file mode 100644
index 2fce526..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.parser;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-public class TableMap {
-  private Map<String, String> aliasToNameMap = Maps.newHashMap();
-  private Map<String, Set<String>> nameToAliasMap = Maps.newHashMap();
-
-  public boolean exists(String alias) {
-    return aliasToNameMap.containsKey(alias);
-  }
-
-  public void addFromTable(String tableName, String alias) {
-    if (aliasToNameMap.containsKey(alias)) {
-      throw new AlreadyExistsTableException();
-    }
-    aliasToNameMap.put(alias, tableName);
-
-    if (nameToAliasMap.containsKey(tableName)) {
-      Preconditions.checkState(!nameToAliasMap.get(tableName).contains(alias),
-          "There is inconsistency of the map between name and alias");
-      nameToAliasMap.get(tableName).add(alias);
-    } else {
-      nameToAliasMap.put(tableName, Sets.newHashSet(alias));
-    }
-  }
-
-  public void addFromTable(QueryBlock.FromTable fromTable) {
-    addFromTable(fromTable.getTableName(), fromTable.getTableId());
-  }
-
-  public String getTableNameByAlias(String alias) {
-    return aliasToNameMap.get(alias);
-  }
-
-  public Iterable<String> getAllTableNames() {
-    return nameToAliasMap.keySet();
-  }
-
-  public Iterable<Entry<String, String>> getAliasToNames() {
-    return aliasToNameMap.entrySet();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
new file mode 100644
index 0000000..24161d4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/FromTable.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.util.TUtil;
+
+public class FromTable implements Cloneable {
+  @Expose
+  private TableDesc desc;
+  @Expose
+  private String alias = null;
+
+  public FromTable() {}
+
+  public FromTable(final TableDesc desc) {
+    this.desc = desc;
+  }
+
+  public FromTable(final TableDesc desc, final String alias) {
+    this(desc);
+    this.alias = alias;
+  }
+
+  public final String getTableName() {
+    return desc.getId();
+  }
+
+  public final String getTableId() {
+    return alias == null ? desc.getId() : alias;
+  }
+
+  public final CatalogProtos.StoreType getStoreType() {
+    return desc.getMeta().getStoreType();
+  }
+
+  public final Schema getSchema() {
+    return desc.getMeta().getSchema();
+  }
+
+  public final void setAlias(String alias) {
+    this.alias = alias;
+  }
+
+  public final String getAlias() {
+    return alias;
+  }
+
+  public final boolean hasAlias() {
+    return alias != null;
+  }
+
+  public final String toString() {
+    if (alias != null)
+      return desc.getId() + " as " + alias;
+    else
+      return desc.getId();
+  }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof FromTable) {
+      FromTable other = (FromTable) obj;
+      return this.desc.equals(other.desc)
+          && TUtil.checkEquals(this.alias, other.alias);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    FromTable table = (FromTable) super.clone();
+    table.desc = (TableDesc) desc.clone();
+    table.alias = alias;
+
+    return table;
+  }
+
+  public String toJSON() {
+    desc.initFromProto();
+    Gson gson = GsonCreator.getInstance();
+    return gson.toJson(this, FromTable.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
new file mode 100644
index 0000000..3c8ee5c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.algebra.Aggregation;
+import org.apache.tajo.catalog.Column;
+
+public class GroupElement implements Cloneable {
+  @Expose
+  private Aggregation.GroupType type;
+  @Expose private Column[] columns;
+
+  @SuppressWarnings("unused")
+  public GroupElement() {
+    // for gson
+  }
+
+  public GroupElement(Aggregation.GroupType type, Column[] columns) {
+    this.type = type;
+    this.columns = columns;
+  }
+
+  public Aggregation.GroupType getType() {
+    return this.type;
+  }
+
+  public Column [] getColumns() {
+    return this.columns;
+  }
+
+  public String toString() {
+    Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
+        .setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    GroupElement groups = (GroupElement) super.clone();
+    groups.type = type;
+    groups.columns = new Column[columns.length];
+    for (int i = 0; i < columns.length; i++) {
+      groups.columns[i++] = (Column) columns[i].clone();
+    }
+    return groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinClause.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinClause.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinClause.java
new file mode 100644
index 0000000..218875a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinClause.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.EvalNode;
+
+public class JoinClause implements Cloneable {
+  @Expose private JoinType joinType;
+  @Expose private FromTable left;
+  @Expose private FromTable right;
+  @Expose private JoinClause leftJoin;
+  @Expose private EvalNode joinQual;
+  @Expose private Column[] joinColumns;
+  @Expose private boolean natural = false;
+
+  @SuppressWarnings("unused")
+  public JoinClause() {
+    // for gson
+  }
+
+  public JoinClause(final JoinType joinType) {
+    this.joinType = joinType;
+  }
+
+  public JoinClause(final JoinType joinType, final FromTable right) {
+    this.joinType = joinType;
+    this.right = right;
+  }
+
+  public JoinClause(final JoinType joinType, final FromTable left,
+                    final FromTable right) {
+    this(joinType, right);
+    this.left = left;
+  }
+
+  public JoinType getJoinType() {
+    return this.joinType;
+  }
+
+  public void setNatural() {
+    this.natural = true;
+  }
+
+  public boolean isNatural() {
+    return this.natural;
+  }
+
+  public void setRight(FromTable right) {
+    this.right = right;
+  }
+
+  public void setLeft(FromTable left) {
+    this.left = left;
+  }
+
+  public void setLeft(JoinClause left) {
+    this.leftJoin = left;
+  }
+
+  public boolean hasLeftJoin() {
+    return leftJoin != null;
+  }
+
+  public FromTable getLeft() {
+    return this.left;
+  }
+
+  public FromTable getRight() {
+    return this.right;
+  }
+
+  public JoinClause getLeftJoin() {
+    return this.leftJoin;
+  }
+
+  public void setJoinQual(EvalNode qual) {
+    this.joinQual = qual;
+  }
+
+  public boolean hasJoinQual() {
+    return this.joinQual != null;
+  }
+
+  public EvalNode getJoinQual() {
+    return this.joinQual;
+  }
+
+  public void setJoinColumns(Column [] columns) {
+    this.joinColumns = columns;
+  }
+
+  public boolean hasJoinColumns() {
+    return this.joinColumns != null;
+  }
+
+  public Column [] getJoinColumns() {
+    return this.joinColumns;
+  }
+
+
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
deleted file mode 100644
index 2fbdab1..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner;
-
-public enum JoinType {
-  CROSS_JOIN,
-  INNER,
-	LEFT_OUTER,
-	RIGHT_OUTER,
-	FULL_OUTER,
-  UNION
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index d62ec69..3cb83ad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * 
- */
 package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
@@ -26,129 +23,59 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.FieldEval;
-import org.apache.tajo.engine.parser.CreateTableStmt;
-import org.apache.tajo.engine.parser.ParseTree;
-import org.apache.tajo.engine.parser.QueryBlock;
-import org.apache.tajo.engine.parser.QueryBlock.Target;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.query.exception.InvalidQueryException;
 import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.util.IndexUtil;
 
-import java.io.IOException;
 import java.util.*;
 
 /**
- * This class optimizes a logical plan corresponding to one query block.
+ * This class optimizes a logical plan.
  */
 public class LogicalOptimizer {
   private static Log LOG = LogFactory.getLog(LogicalOptimizer.class);
-  
+
   private LogicalOptimizer() {
   }
 
-  public static LogicalNode optimize(PlanningContext context, LogicalNode plan) {
-    LogicalNode toBeOptimized;
+  public static class OptimizationContext {
+    TargetListManager targetListManager;
 
-    try {
-      toBeOptimized = (LogicalNode) plan.clone();
-    } catch (CloneNotSupportedException e) {
-      LOG.error(e);
-      throw new InvalidQueryException("Cannot clone: " + plan);
+    public OptimizationContext(LogicalPlan plan, String blockName) {
+      this.targetListManager = new TargetListManager(plan, blockName);
     }
 
-    switch (context.getParseTree().getType()) {
-      case SELECT:
-        //case UNION: // TODO - to be implemented
-        //case EXCEPT:
-        //case INTERSECT:
-      case CREATE_TABLE_AS:
-        // if there are selection node
-        if(PlannerUtil.findTopNode(plan, ExprType.SELECTION) != null) {
-          pushSelection(context, toBeOptimized);
-        }
-
-        try {
-          pushProjection(context, toBeOptimized);
-        } catch (CloneNotSupportedException e) {
-          throw new InvalidQueryException(e);
-        }
-
-        break;
-      default:
+    public OptimizationContext(LogicalPlan plan, Target [] targets) {
+      this.targetListManager = new TargetListManager(plan, targets);
     }
 
-    return toBeOptimized;
-  }
-  
-  public static LogicalNode pushIndex(LogicalNode plan , StorageManager sm) throws IOException {
-    if(PlannerUtil.findTopNode(plan, ExprType.SCAN) == null) {
-      return plan;
-    }
-    LogicalNode toBeOptimized;
-    try {
-      toBeOptimized = (LogicalNode) plan.clone();
-    } catch (CloneNotSupportedException e) {
-      LOG.error(e);
-      throw new InvalidQueryException("Cannot clone: " + plan);
+    public TargetListManager getTargetListManager() {
+      return this.targetListManager;
     }
-  
-    changeScanToIndexNode(null ,toBeOptimized , sm);
-    return toBeOptimized;
-    
   }
-  private static void changeScanToIndexNode
-    (LogicalNode parent , LogicalNode cur , StorageManager sm ) throws IOException {
-    if( cur instanceof BinaryNode) {
-      changeScanToIndexNode(cur , ((BinaryNode)cur).getOuterNode() , sm);
-      changeScanToIndexNode(cur , ((BinaryNode)cur).getInnerNode() , sm);
+
+  public static LogicalNode optimize(LogicalPlan plan) throws CloneNotSupportedException {
+    LogicalNode toBeOptimized;
+
+    toBeOptimized = plan.getRootBlock().getRoot();
+
+    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
+      LOG.info("This query skips the logical optimization step.");
     } else {
-      switch(cur.getType()) {
-      case CREATE_INDEX:
-        return;
-      case SCAN:
-        ScanNode scan = (ScanNode)cur;
-        EvalNode qual = scan.getQual();
-        if(qual == null) { 
-          return;
-        }else {
-          String tableName = scan.getTableId();
-          Path path = new Path(sm.getTablePath(tableName), "index");
-         
-          if(sm.getFileSystem().exists(path)) {
-            
-            TableMeta meta = sm.getTableMeta(path);       
-            IndexScanNode node;
-            if ((node = IndexUtil.indexEval(scan, meta.getOptions())) == null) {
-              return;
-            }
-            if( parent instanceof BinaryNode ) {
-              if (scan.equals(((BinaryNode)parent).getOuterNode())) {
-                ((BinaryNode)parent).setOuter(node);
-              } else {
-                ((BinaryNode)parent).setInner(node);
-               }
-            } else {
-              ((UnaryNode)parent).setSubNode(node);
-            }
-          }
-          return;
-        }
-      default:
-        changeScanToIndexNode(cur , ((UnaryNode)cur).getSubNode() , sm);
-        break;
+      if(PlannerUtil.findTopNode(toBeOptimized, ExprType.SELECTION) != null) {
+            pushSelection(toBeOptimized);
       }
+      pushProjection(plan);
     }
+    return toBeOptimized;
   }
+
   /**
    * This method pushes down the selection into the appropriate sub 
    * logical operators.
@@ -166,11 +93,10 @@ public class LogicalOptimizer {
    * multiple relations are actually join conditions, and other expression 
    * on single relation can be used in a scan operator or an Index Scan 
    * operator.   
-   * 
-   * @param ctx
+   *
    * @param plan
    */
-  private static void pushSelection(PlanningContext ctx, LogicalNode plan) {
+  private static void pushSelection(LogicalNode plan) {
     SelectionNode selNode = (SelectionNode) PlannerUtil.findTopNode(plan,
         ExprType.SELECTION);
     Preconditions.checkNotNull(selNode);
@@ -184,7 +110,12 @@ public class LogicalOptimizer {
                                              List<EvalNode> cnf, Stack<LogicalNode> stack) {
     
     switch(plan.getType()) {
-    
+
+      case ROOT:
+        LogicalRootNode rootNode = (LogicalRootNode) plan;
+        pushSelectionRecursive(rootNode.getSubNode(), cnf, stack);
+        break;
+
     case SELECTION:
       SelectionNode selNode = (SelectionNode) plan;
       stack.push(selNode);
@@ -214,7 +145,7 @@ public class LogicalOptimizer {
 
       List<EvalNode> matched = Lists.newArrayList();
       for (EvalNode eval : cnf) {
-        if (canBeEvaluated(eval, plan)) {
+        if (PlannerUtil.canBeEvaluated(eval, plan)) {
           matched.add(eval);
         }
       }
@@ -249,7 +180,7 @@ public class LogicalOptimizer {
     case SCAN:
       matched = Lists.newArrayList();
       for (EvalNode eval : cnf) {
-        if (canBeEvaluated(eval, plan)) {
+        if (PlannerUtil.canBeEvaluated(eval, plan)) {
           matched.add(eval);
         }
       }
@@ -286,457 +217,336 @@ public class LogicalOptimizer {
       break;
     }
   }
-  
-  public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
-
-    if (node.getType() == ExprType.JOIN) {
-      JoinNode joinNode = (JoinNode) node;
-      Set<String> tableIds = Sets.newHashSet();
-      // getting distinct table references
-      for (Column col : columnRefs) {
-        if (!tableIds.contains(col.getTableName())) {
-          tableIds.add(col.getTableName());
-        }
-      }
-      
-      // if the references only indicate two relation, the condition can be 
-      // pushed into a join operator.
-      if (tableIds.size() != 2) {
-        return false;
-      }
-      
-      String [] outer = PlannerUtil.getLineage(joinNode.getOuterNode());
-      String [] inner = PlannerUtil.getLineage(joinNode.getInnerNode());
-
-      Set<String> o = Sets.newHashSet(outer);
-      Set<String> i = Sets.newHashSet(inner);
-      if (outer == null || inner == null) {      
-        throw new InvalidQueryException("ERROR: Unexpected logical plan");
-      }
-      Iterator<String> it = tableIds.iterator();
-      if (o.contains(it.next()) && i.contains(it.next())) {
-        return true;
-      }
-      
-      it = tableIds.iterator();
-      if (i.contains(it.next()) && o.contains(it.next())) {
-        return true;
-      }
-      
-      return false;
-    } else {
-      for (Column col : columnRefs) {
-        if (!node.getInSchema().contains(col.getQualifiedName())) {
-          return false;
-        }
-      }
-
-      return true;
-    }
-  }
 
   /**
    * This method pushes down the projection list into the appropriate and
    * below logical operators.
-   * @param context
    * @param plan
    */
-  private static void pushProjection(PlanningContext context,
-                                     LogicalNode plan)
+  private static void pushProjection(LogicalPlan plan)
       throws CloneNotSupportedException {
     Stack<LogicalNode> stack = new Stack<LogicalNode>();
 
-    ParseTree tree = context.getParseTree();
-    QueryBlock block;
-
-    if (tree instanceof QueryBlock) {
-      block = (QueryBlock) context.getParseTree();
-
-    } else if (tree instanceof CreateTableStmt) {
-
-      CreateTableStmt createTableStmt = (CreateTableStmt) tree;
+    OptimizationContext optCtx;
 
-      if (createTableStmt.hasQueryBlock()) {
-        block = createTableStmt.getSelectStmt();
-      } else {
-        return;
-      }
+    if (plan.getRootBlock().getProjection() != null &&
+        plan.getRootBlock().getProjection().isAllProjected()) {
+      optCtx = new OptimizationContext(plan,
+          plan.getRootBlock().getProjectionNode().getTargets());
     } else {
-
-      return;
+      optCtx = new OptimizationContext(plan, LogicalPlan.ROOT_BLOCK);
     }
-    OptimizationContext optCtx = new OptimizationContext(context,
-        block.getTargetList());
 
-    pushProjectionRecursive(context, optCtx, plan, stack, new HashSet<Column>());
+    Collection<Column> finalSchema = plan.getRootBlock().getSchema().getColumns();
+
+    pushProjectionRecursive(plan, optCtx, plan.getRootBlock().getRoot(), stack,
+        new HashSet<Column>(finalSchema));
+
   }
 
   /**
-   * Groupby, Join, and Scan can project necessary columns.
-   * This method has three roles:
-   * 1) collect column reference necessary for sortkeys, join keys, selection conditions, grouping fields,
-   * and having conditions
+   * This method visits all operators recursively and shrink output columns according to only necessary columns
+   * at each operator.
+   *
+   * This method has three steps:
+   * <ol>
+   *   <li></li> collect column references necessary for each operator. For example, sort requires sortkeys,
+   *   join requires join conditions, selection requires filter conditions, and groupby requires grouping keys and having
+   *
    * 2) shrink the output schema of each operator so that the operator reduces the output columns according to
    * the necessary columns of their parent operators
    * 3) shrink the input schema of each operator according to the shrunk output schemas of the child operators.
-   *
-   *
-   * @param ctx
-   * @param node
-   * //@param necessary - columns necessary for above logical nodes, but it excepts the fields for the target list
-   * //@param targetList
-   * @return
    */
   private static LogicalNode pushProjectionRecursive(
-      final PlanningContext ctx, final OptimizationContext optContext,
+      final LogicalPlan plan, final OptimizationContext optContext,
       final LogicalNode node, final Stack<LogicalNode> stack,
-      final Set<Column> necessary) throws CloneNotSupportedException {
+      final Set<Column> upperRequired) throws CloneNotSupportedException {
+
+    LogicalNode currentNode = null;
 
-    LogicalNode outer;
-    LogicalNode inner;
     switch (node.getType()) {
-      case ROOT: // It does not support the projection
-        LogicalRootNode root = (LogicalRootNode) node;
-        stack.add(root);
-        outer = pushProjectionRecursive(ctx, optContext,
-            root.getSubNode(), stack, necessary);
-        root.setInSchema(outer.getOutSchema());
-        root.setOutSchema(outer.getOutSchema());
+      // They need only simple work
+      case ROOT:
+      case STORE:
+      case LIMIT:
+        currentNode = pushDownCommonPost(plan, optContext, (UnaryNode) node, upperRequired, stack);
         break;
 
-      case STORE:
-        StoreTableNode store = (StoreTableNode) node;
-        stack.add(store);
-        outer = pushProjectionRecursive(ctx, optContext,
-            store.getSubNode(), stack, necessary);
-        store.setInSchema(outer.getOutSchema());
-        store.setOutSchema(outer.getOutSchema());
+      // They need special preworks.
+      case SELECTION:
+        currentNode = pushDownSelection(plan, optContext, (SelectionNode) node, upperRequired, stack);
+        break;
+      case SORT:
+        currentNode = pushDownSort(plan, optContext, (SortNode) node, upperRequired, stack);
+        break;
+      case UNION:
+      case EXCEPT:
+      case INTERSECT:
+        currentNode = pushDownSetNode(plan, optContext, (UnionNode) node, upperRequired, stack);
         break;
 
+      // Projection, GroupBy, Join, and Scan are all projectable.
+      // A projectable operator can shrink or expand output columns through alias name and expressions.
       case PROJECTION:
-        ProjectionNode projNode = (ProjectionNode) node;
-
-        stack.add(projNode);
-        outer = pushProjectionRecursive(ctx, optContext,
-            projNode.getSubNode(), stack, necessary);
-        stack.pop();
-
-        LogicalNode childNode = projNode.getSubNode();
-        if (optContext.getTargetListManager().isAllEvaluated() // if all exprs are evaluated
-            && (childNode.getType() == ExprType.JOIN ||
-               childNode.getType() == ExprType.GROUP_BY ||
-               childNode.getType() == ExprType.SCAN)) { // if the child node is projectable
-            projNode.getSubNode().setOutSchema(
-                optContext.getTargetListManager().getUpdatedSchema());
-            LogicalNode parent = stack.peek();
-            ((UnaryNode)parent).setSubNode(projNode.getSubNode());
-            return projNode.getSubNode();
-        } else {
-          // the output schema is not changed.
-          projNode.setInSchema(outer.getOutSchema());
-          projNode.setTargetList(
-              optContext.getTargetListManager().getUpdatedTarget());
-        }
-        return projNode;
+        currentNode = pushDownProjection(plan, optContext, (ProjectionNode) node, upperRequired, stack);
+        break;
+      case GROUP_BY:
+        currentNode = pushDownGroupBy(plan, optContext, (GroupbyNode) node, upperRequired, stack);
+        break;
+      case JOIN:
+        currentNode = pushDownJoin(plan, optContext, (JoinNode) node, upperRequired, stack);
+        break;
+      case SCAN:
+        currentNode = pushdownScanNode(optContext, (ScanNode) node, upperRequired, stack);
+        break;
 
-      case SELECTION: // It does not support the projection
-        SelectionNode selNode = (SelectionNode) node;
+      default:
+    }
 
-        if (selNode.getQual() != null) {
-          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(selNode.getQual()));
-        }
+    return currentNode;
+  }
 
-        stack.add(selNode);
-        outer = pushProjectionRecursive(ctx, optContext, selNode.getSubNode(),
-            stack, necessary);
-        stack.pop();
-        selNode.setInSchema(outer.getOutSchema());
-        selNode.setOutSchema(outer.getOutSchema());
-        break;
+  private static LogicalNode pushDownCommonPost(LogicalPlan plan, OptimizationContext context, UnaryNode node,
+                                                Set<Column> upperRequired, Stack<LogicalNode> stack) throws CloneNotSupportedException {
+    stack.push(node);
+    LogicalNode child = pushProjectionRecursive(plan, context,
+        node.getSubNode(), stack, upperRequired);
+    stack.pop();
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+
+    if (node instanceof Projectable) {
+      pushDownProjectablePost(context, node, upperRequired, isTopmostProjectable(stack));
+    }
+    return node;
+  }
 
-      case GROUP_BY: {
-        GroupbyNode groupByNode = (GroupbyNode)node;
+  private static LogicalNode pushDownProjection(LogicalPlan plan, OptimizationContext context,
+                                                   ProjectionNode projNode, Set<Column> upperRequired,
+                                                   Stack<LogicalNode> path) throws CloneNotSupportedException {
 
-        if (groupByNode.hasHavingCondition()) {
-          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(groupByNode.getHavingCondition()));
-        }
+    for (Target target : projNode.getTargets()) {
+      upperRequired.add(target.getColumnSchema());
+    }
 
-        stack.add(groupByNode);
-        outer = pushProjectionRecursive(ctx, optContext,
-            groupByNode.getSubNode(), stack, necessary);
-        stack.pop();
-        groupByNode.setInSchema(outer.getOutSchema());
-        // set all targets
-        groupByNode.setTargetList(optContext.getTargetListManager().getUpdatedTarget());
-
-        TargetListManager targets = optContext.getTargetListManager();
-        List<Target> groupbyPushable = Lists.newArrayList();
-        List<Integer> groupbyPushableId = Lists.newArrayList();
-
-        EvalNode expr;
-        for (int i = 0; i < targets.size(); i++) {
-          expr = targets.getTarget(i).getEvalTree();
-          if (canBeEvaluated(expr, groupByNode) &&
-              EvalTreeUtil.findDistinctAggFunction(expr).size() > 0 && expr.getType() != EvalNode.Type.FIELD) {
-            targets.setEvaluated(i);
-            groupbyPushable.add((Target) targets.getTarget(i).clone());
-            groupbyPushableId.add(i);
-          }
-        }
+    path.push(projNode);
+    LogicalNode child = pushProjectionRecursive(plan, context, projNode.getSubNode(), path, upperRequired);
+    path.pop();
 
-        return groupByNode;
-      }
+    LogicalNode childNode = projNode.getSubNode();
 
-      case SORT: // It does not support the projection
-        SortNode sortNode = (SortNode) node;
+    // If all expressions are evaluated in the child operators and the last operator is projectable,
+    // ProjectionNode will not be necessary. It eliminates ProjectionNode.
+    if (context.getTargetListManager().isAllEvaluated() && (childNode instanceof Projectable)) {
+      LogicalNode parent = path.peek();
+      // update the child node's output schemas
+      child.setOutSchema(context.getTargetListManager().getUpdatedSchema());
+      PlannerUtil.deleteNode(parent, projNode);
+      return child;
+    } else {
+      projNode.setInSchema(child.getOutSchema());
+      projNode.setTargets(context.getTargetListManager().getUpdatedTarget());
+      return projNode;
+    }
+  }
 
-        for (SortSpec spec : sortNode.getSortKeys()) {
-          necessary.add(spec.getSortKey());
-        }
+  private static LogicalNode pushDownSelection(LogicalPlan plan, OptimizationContext context,
+                                                 SelectionNode selectionNode, Set<Column> upperRequired,
+                                                 Stack<LogicalNode> path) throws CloneNotSupportedException {
+    if (selectionNode.getQual() != null) {
+      upperRequired.addAll(EvalTreeUtil.findDistinctRefColumns(selectionNode.getQual()));
+    }
 
-        stack.add(sortNode);
-        outer = pushProjectionRecursive(ctx, optContext,
-            sortNode.getSubNode(), stack, necessary);
-        stack.pop();
-        sortNode.setInSchema(outer.getOutSchema());
-        sortNode.setOutSchema(outer.getOutSchema());
-        break;
+    return pushDownCommonPost(plan, context, selectionNode, upperRequired, path);
+  }
 
+  private static GroupbyNode pushDownGroupBy(LogicalPlan plan, OptimizationContext context, GroupbyNode groupbyNode,
+                                             Set<Column> upperRequired, Stack<LogicalNode> stack)
+      throws CloneNotSupportedException {
 
-      case JOIN: {
-        JoinNode joinNode = (JoinNode) node;
-        Set<Column> parentNecessary = Sets.newHashSet(necessary);
+    Set<Column> currentRequired = new HashSet<Column>(upperRequired);
 
-        if (joinNode.hasJoinQual()) {
-          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual()));
-        }
+    if (groupbyNode.hasHavingCondition()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(groupbyNode.getHavingCondition()));
+    }
 
-        stack.add(joinNode);
-        outer = pushProjectionRecursive(ctx, optContext,
-            joinNode.getOuterNode(), stack, necessary);
-        inner = pushProjectionRecursive(ctx, optContext,
-            joinNode.getInnerNode(), stack, necessary);
-        stack.pop();
-        Schema merged = SchemaUtil
-            .merge(outer.getOutSchema(), inner.getOutSchema());
-        joinNode.setInSchema(merged);
-
-        TargetListManager targets = optContext.getTargetListManager();
-        List<Target> joinPushable = Lists.newArrayList();
-        List<Integer> joinPushableId = Lists.newArrayList();
-        EvalNode expr;
-        for (int i = 0; i < targets.size(); i++) {
-          expr = targets.getTarget(i).getEvalTree();
-          if (canBeEvaluated(expr, joinNode)
-              && EvalTreeUtil.findDistinctAggFunction(expr).size() == 0
-              && expr.getType() != EvalNode.Type.FIELD) {
-            targets.setEvaluated(i);
-            joinPushable.add(targets.getTarget(i));
-            joinPushableId.add(i);
-          }
-        }
-        if (joinPushable.size() > 0) {
-          joinNode.setTargetList(targets.targets);
-        }
+    for (Target target : groupbyNode.getTargets()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
+    }
 
-        Schema outSchema = shrinkOutSchema(joinNode.getInSchema(), targets.getUpdatedSchema().getColumns());
-        for (Integer t : joinPushableId) {
-          outSchema.addColumn(targets.getEvaluatedColumn(t));
-        }
-        outSchema = SchemaUtil.mergeAllWithNoDup(outSchema.getColumns(),
-            SchemaUtil.getProjectedSchema(joinNode.getInSchema(),parentNecessary).getColumns());
-        joinNode.setOutSchema(outSchema);
-        break;
-      }
+    pushDownCommonPost(plan, context, groupbyNode, currentRequired, stack);
+    return groupbyNode;
+  }
 
-      case UNION:  // It does not support the projection
-        UnionNode unionNode = (UnionNode) node;
-        stack.add(unionNode);
+  private static SortNode pushDownSort(LogicalPlan plan, OptimizationContext context, SortNode sortNode,
+                                       Set<Column> upperRequired, Stack<LogicalNode> stack)
+      throws CloneNotSupportedException {
 
-        ParseTree tree =  ctx.getParseTree();
-        if (tree instanceof CreateTableStmt) {
-          tree = ((CreateTableStmt) tree).getSelectStmt();
-        }
-        QueryBlock block = (QueryBlock) tree;
-
-        OptimizationContext outerCtx = new OptimizationContext(ctx,
-            block.getTargetList());
-        OptimizationContext innerCtx = new OptimizationContext(ctx,
-            block.getTargetList());
-        pushProjectionRecursive(ctx, outerCtx, unionNode.getOuterNode(),
-            stack, necessary);
-        pushProjectionRecursive(ctx, innerCtx, unionNode.getInnerNode(),
-            stack, necessary);
-        stack.pop();
-
-        // if this is the final union, we assume that all targets are evalauted
-        // TODO - is it always correct?
-        if (stack.peek().getType() != ExprType.UNION) {
-          optContext.getTargetListManager().setEvaluatedAll();
-        }
-        break;
+    for (SortSpec spec : sortNode.getSortKeys()) {
+      upperRequired.add(spec.getSortKey());
+    }
 
-      case SCAN: {
-        ScanNode scanNode = (ScanNode) node;
-        TargetListManager targets = optContext.getTargetListManager();
-        List<Integer> scanPushableId = Lists.newArrayList();
-        List<Target> scanPushable = Lists.newArrayList();
-        EvalNode expr;
-        for (int i = 0; i < targets.size(); i++) {
-          expr = targets.getTarget(i).getEvalTree();
-          if (!targets.isEvaluated(i) && canBeEvaluated(expr, scanNode)) {
-            if (expr.getType() == EvalNode.Type.FIELD) {
-              targets.setEvaluated(i);
-            } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
-              targets.setEvaluated(i);
-              scanPushable.add(targets.getTarget(i));
-              scanPushableId.add(i);
-            }
-          }
-        }
+    pushDownCommonPost(plan, context, sortNode, upperRequired, stack);
 
-        if (scanPushable.size() > 0) {
-          scanNode.setTargets(scanPushable.toArray(new Target[scanPushable.size()]));
-        }
-        Schema outSchema = shrinkOutSchema(scanNode.getInSchema(), targets.getUpdatedSchema().getColumns());
-        for (Integer t : scanPushableId) {
-          outSchema.addColumn(targets.getEvaluatedColumn(t));
-        }
-        outSchema = SchemaUtil.mergeAllWithNoDup(outSchema.getColumns(), SchemaUtil.getProjectedSchema(scanNode.getInSchema(),necessary).getColumns());
-        scanNode.setOutSchema(outSchema);
+    return sortNode;
+  }
 
-        break;
+  private static JoinNode pushDownJoin(LogicalPlan plan, OptimizationContext context, JoinNode joinNode,
+                                       Set<Column> upperRequired, Stack<LogicalNode> path)
+      throws CloneNotSupportedException {
+    Set<Column> currentRequired = Sets.newHashSet(upperRequired);
+
+    if (joinNode.hasTargets()) {
+      EvalNode expr;
+      for (Target target : joinNode.getTargets()) {
+        expr = target.getEvalTree();
+        if (expr.getType() != EvalNode.Type.FIELD) {
+          currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
+        }
       }
+    }
 
-      default:
+    if (joinNode.hasJoinQual()) {
+      currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual()));
     }
 
-    return node;
+    path.push(joinNode);
+    LogicalNode outer = pushProjectionRecursive(plan, context,
+        joinNode.getOuterNode(), path, currentRequired);
+    LogicalNode inner = pushProjectionRecursive(plan, context,
+        joinNode.getInnerNode(), path, currentRequired);
+    path.pop();
+
+    Schema merged = SchemaUtil.merge(outer.getOutSchema(), inner.getOutSchema());
+    joinNode.setInSchema(merged);
+    pushDownProjectablePost(context, joinNode, upperRequired, isTopmostProjectable(path));
+
+    return joinNode;
   }
 
-  private static Schema shrinkOutSchema(Schema inSchema, Collection<Column> necessary) {
-    Schema projected = new Schema();
-    for(Column col : inSchema.getColumns()) {
-      if(necessary.contains(col)) {
-        projected.addColumn(col);
-      }
+  private static BinaryNode pushDownSetNode(LogicalPlan plan, OptimizationContext context, UnionNode node,
+                                            Set<Column> upperRequired, Stack<LogicalNode> stack)
+      throws CloneNotSupportedException {
+    BinaryNode setNode = node;
+
+    LogicalPlan.QueryBlock leftBlock = plan.getBlock(setNode.getOuterNode());
+    OptimizationContext leftCtx = new OptimizationContext(plan,
+        leftBlock.getTargetListManager().getUnEvaluatedTargets());
+    LogicalPlan.QueryBlock rightBlock = plan.getBlock(setNode.getInnerNode());
+    OptimizationContext rightCtx = new OptimizationContext(plan,
+        rightBlock.getTargetListManager().getUnEvaluatedTargets());
+
+    stack.push(setNode);
+    pushProjectionRecursive(plan, leftCtx, setNode.getOuterNode(), stack, upperRequired);
+    pushProjectionRecursive(plan, rightCtx, setNode.getInnerNode(), stack, upperRequired);
+    stack.pop();
+
+    // if this is the final union, we assume that all targets are evalauted.
+    // TODO - is it always correct?
+    if (stack.peek().getType() != ExprType.UNION) {
+      context.getTargetListManager().setEvaluatedAll();
     }
-    return projected;
+
+    return setNode;
   }
 
-  public static class OptimizationContext {
-    PlanningContext context;
-    TargetListManager targetListManager;
+  private static ScanNode pushdownScanNode(OptimizationContext optContext, ScanNode scanNode,
+                                           Set<Column> upperRequired, Stack<LogicalNode> stack)
+      throws CloneNotSupportedException {
+    return (ScanNode) pushDownProjectablePost(optContext, scanNode, upperRequired, isTopmostProjectable(stack));
+  }
 
-    public OptimizationContext(PlanningContext context, Target [] targets) {
-      this.context = context;
-      this.targetListManager = new TargetListManager(context, targets);
+  private static boolean isTopmostProjectable(Stack<LogicalNode> stack) {
+    for (LogicalNode node : stack) {
+      if (node.getType() == ExprType.JOIN || node.getType() == ExprType.GROUP_BY) {
+        return false;
+      }
     }
 
-    public TargetListManager getTargetListManager() {
-      return this.targetListManager;
-    }
+    return true;
   }
 
-  public static class TargetListManager {
-    private PlanningContext context;
-    private boolean [] evaluated;
-    private Target [] targets;
-
-    public TargetListManager(PlanningContext context, Target [] targets) {
-      this.context = context;
-      if (targets == null) {
-        evaluated = new boolean[0];
-      } else {
-        evaluated = new boolean[targets.length];
-      }
-      this.targets = targets;
-    }
+  private static LogicalNode pushDownProjectablePost(OptimizationContext context, LogicalNode node,
+                                                     Set<Column> upperRequired, boolean last)
+      throws CloneNotSupportedException {
+    TargetListManager targetListManager = context.getTargetListManager();
+    EvalNode expr;
 
-    public Target getTarget(int id) {
-      return targets[id];
-    }
+    List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
 
-    public Target [] getTargets() {
-      return this.targets;
-    }
+    for (int i = 0; i < targetListManager.size(); i++) {
+      expr = targetListManager.getTarget(i).getEvalTree();
 
-    public int size() {
-      return targets.length;
-    }
+      if (!targetListManager.isEvaluated(i) && PlannerUtil.canBeEvaluated(expr, node)) {
 
-    public void setEvaluated(int id) {
-      evaluated[id] = true;
-    }
+        if (node instanceof ScanNode) { // For ScanNode
 
-    public void setEvaluatedAll() {
-      for (int i = 0; i < evaluated.length; i++) {
-        evaluated[i] = true;
-      }
-    }
+          if (expr.getType() == EvalNode.Type.FIELD && !targetListManager.getTarget(i).hasAlias()) {
+            targetListManager.setEvaluated(i);
+          } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
 
-    private boolean isEvaluated(int id) {
-      return evaluated[id];
-    }
+        } else if (node instanceof GroupbyNode) { // For GroupBy
+          if (EvalTreeUtil.findDistinctAggFunction(expr).size() > 0 && expr.getType() != EvalNode.Type.FIELD) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
 
-    public Target [] getUpdatedTarget() throws CloneNotSupportedException {
-      Target [] updated = new Target[evaluated.length];
-      for (int i = 0; i < evaluated.length; i++) {
-        if (evaluated[i]) {
-          Column col = getEvaluatedColumn(i);
-          updated[i] = new Target(new FieldEval(col), i);
-        } else {
-          updated[i] = (Target) targets[i].clone();
+        } else if (node instanceof JoinNode) {
+          if (expr.getType() != EvalNode.Type.FIELD && EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+            targetListManager.setEvaluated(i);
+            newEvaluatedTargetIds.add(i);
+          }
         }
       }
-      return updated;
     }
 
-    public Schema getUpdatedSchema() {
-      Schema schema = new Schema();
-      for (int i = 0; i < evaluated.length; i++) {
-        if (evaluated[i]) {
-          Column col = getEvaluatedColumn(i);
-          schema.addColumn(col);
-        } else {
-          Collection<Column> cols = getColumnRefs(i);
-          for (Column col : cols) {
-            if (!schema.contains(col.getQualifiedName())) {
-              schema.addColumn(col);
-            }
-          }
-        }
+    Projectable projectable = (Projectable) node;
+    if (last) {
+      Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated.");
+      projectable.setTargets(targetListManager.getTargets());
+      targetListManager.getUpdatedTarget();
+      node.setOutSchema(targetListManager.getUpdatedSchema());
+    } else {
+    // Preparing targets regardless of that the node has targets or not.
+    // This part is required because some node does not have any targets,
+    // if the node has the same input and output schemas.
+
+    Target [] checkingTargets;
+    if (!projectable.hasTargets()) {
+      Schema outSchema = node.getOutSchema();
+      checkingTargets = new Target[outSchema.getColumnNum() + newEvaluatedTargetIds.size()];
+      PlannerUtil.schemaToTargets(outSchema, checkingTargets);
+      int baseIdx = outSchema.getColumnNum();
+      for (int i = 0; i < newEvaluatedTargetIds.size(); i++) {
+        checkingTargets[baseIdx + i] = targetListManager.getTarget(newEvaluatedTargetIds.get(i));
       }
-
-      return schema;
+    } else {
+      checkingTargets = projectable.getTargets();
     }
 
-    public Collection<Column> getColumnRefs(int id) {
-      return EvalTreeUtil.findDistinctRefColumns(targets[id].getEvalTree());
-    }
+    List<Target> projectedTargets = new ArrayList<Target>();
+    for (Column column : upperRequired) {
+      for (Target target : checkingTargets) {
 
-    public Column getEvaluatedColumn(int id) {
-      Target t = targets[id];
-      String name;
-      if (t.hasAlias()) {
-        name = t.getAlias();
-      } else if (t.getEvalTree().getName().equals("?")) {
-        name = context.getGeneratedColumnName();
-      } else {
-        name = t.getEvalTree().getName();
-      }
-      return new Column(name, t.getEvalTree().getValueType()[0]);
-    }
+        if (target.hasAlias() && target.getAlias().equalsIgnoreCase(column.getQualifiedName())) {
+            projectedTargets.add(target);
+        } else {
 
-    public boolean isAllEvaluated() {
-      for (boolean isEval : evaluated) {
-        if (!isEval) {
-          return false;
+          if (target.getColumnSchema().equals(column)) {
+            projectedTargets.add(target);
+          }
         }
       }
+    }
 
-      return true;
+    projectable.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    targetListManager.getUpdatedTarget();
+    node.setOutSchema(PlannerUtil.targetToSchema(projectable.getTargets()));
     }
+
+    return node;
   }
 }
\ No newline at end of file