You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/29 06:03:38 UTC

[09/13] TAJO-87: Integration of tajo algebra module and SQL parser

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
new file mode 100644
index 0000000..5a9f101
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.ColumnReferenceExpr;
+import org.apache.tajo.algebra.Projection;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.logical.*;
+
+import java.util.*;
+
+/**
+ * This represents and keeps every information about a query plan for a query.
+ */
+@NotThreadSafe
+public class LogicalPlan {
+  private final LogicalPlanner planner;
+
+  /** the prefix character for virtual tables */
+  public static final char VIRTUAL_TABLE_PREFIX='@';
+  /** it indicates the root block */
+  public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
+  /** it indicates a table itself */
+  public static final String TABLE_SELF = VIRTUAL_TABLE_PREFIX + "SELF";
+
+  public static final String ANONYMOUS_TABLE_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
+  public static Integer anonymousBlockId = 0;
+  public static Integer anonymousColumnId = 0;
+
+  /** a map from between a block name to a block plan */
+  private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
+  private Map<LogicalNode, QueryBlock> queryBlockByNode = new HashMap<LogicalNode, QueryBlock>();
+  private Set<LogicalNode> visited = new HashSet<LogicalNode>();
+
+  public LogicalPlan(LogicalPlanner planner) {
+    this.planner = planner;
+  }
+
+  /**
+   * Create a new {@link QueryBlock} and Get
+   *
+   * @param blockName the query block name
+   * @return a created query block
+   */
+  public QueryBlock newAndGetBlock(String blockName) {
+    QueryBlock block = new QueryBlock(blockName);
+    queryBlocks.put(blockName, block);
+    return block;
+  }
+
+  public QueryBlock newAnonymousBlock() {
+    return newAndGetBlock(ANONYMOUS_TABLE_PREFIX + (anonymousBlockId++));
+  }
+
+  public String newAnonymousColumnName() {
+    return "column_" + (anonymousColumnId ++);
+  }
+
+  /**
+   * Check if a query block exists
+   * @param blockName the query block name to be checked
+   * @return true if exists. Otherwise, false
+   */
+  public boolean existBlock(String blockName) {
+    return queryBlocks.containsKey(blockName);
+  }
+
+  public QueryBlock getRootBlock() {
+    return queryBlocks.get(ROOT_BLOCK);
+  }
+
+  public QueryBlock getBlock(String blockName) {
+    return queryBlocks.get(blockName);
+  }
+
+  public QueryBlock getBlock(LogicalNode node) {
+    return queryBlockByNode.get(node);
+  }
+
+  public Collection<QueryBlock> getQueryBlocks() {
+    return queryBlocks.values();
+  }
+
+  public boolean postVisit(String blockName, LogicalNode node, Stack<ExprType> path) {
+    if (visited.contains(node)) {
+      return false;
+    }
+
+    QueryBlock block = queryBlocks.get(blockName);
+
+    // if an added operator is a relation, add it to relation set.
+    switch (node.getType()) {
+      case SCAN:
+        ScanNode relationOp = (ScanNode) node;
+        block.addRelation(relationOp);
+        break;
+
+      case GROUP_BY:
+        block.resolveGrouping();
+        break;
+    }
+
+
+    // if this node is the topmost
+    if (path.size() == 0) {
+      block.setRoot(node);
+    }
+
+    return true;
+  }
+
+  /**
+   * It tries to find a column with a qualified column name.
+   *
+   * @throws VerifyException this exception occurs if there is no column matched to the given name.
+   */
+  public Column findColumnFromRelation(String blockName, String relName, String name)
+      throws VerifyException {
+
+    QueryBlock block = queryBlocks.get(blockName);
+    ScanNode relationOp = block.getRelation(relName);
+
+    // if a column name is outside of this query block
+    if (relationOp == null) {
+      // TODO - nested query can only refer outer query block? or not?
+      for (QueryBlock eachBlock : queryBlocks.values()) {
+        if (eachBlock.containRelation(relName)) {
+          relationOp = eachBlock.getRelation(relName);
+        }
+      }
+    }
+
+    if (relationOp == null) {
+      throw new NoSuchColumnException(relName + "." + name);
+    }
+
+    Schema schema = relationOp.getTableSchema();
+
+    Column column = schema.getColumnByName(name);
+    if (column == null) {
+      throw new VerifyException("ERROR: no such a column "+ name);
+    }
+
+    try {
+      column = (Column) column.clone();
+    } catch (CloneNotSupportedException e) {
+      e.printStackTrace();
+    }
+    String tableName = relationOp.getTableId();
+    column.setName(tableName + "." + column.getColumnName());
+
+    return column;
+  }
+
+  /**
+   * Try to find column from the output of child plans.
+   *
+   * @throws VerifyException
+   */
+  public Column findColumnFromChildNode(ColumnReferenceExpr columnRef, String blockName,
+                                        LogicalNode node)
+      throws VerifyException{
+    List<Column> candidates = new ArrayList<Column>();
+
+    Column candidate;
+    if (columnRef.hasTableName()) {
+      candidate = node.getOutSchema().getColumn(columnRef.getCanonicalName());
+
+      if (candidate == null) { // If not found, try to find the column with alias name
+        String tableName = getBlock(blockName).getRelation(columnRef.getTableName()).getTableId();
+        candidate = node.getOutSchema().getColumn(tableName + "." + columnRef.getName());
+      }
+      candidates.add(candidate);
+
+    } else {
+      candidate = node.getOutSchema().getColumnByName(columnRef.getName());
+      candidates.add(candidate);
+    }
+
+    if (candidates.isEmpty()) {
+      throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName());
+    } else  if (candidates.size() > 1) {
+      throw new VerifyException("ERROR: column name "+ columnRef.getCanonicalName()
+          + " is ambiguous");
+    }
+
+    return candidates.get(0);
+  }
+
+
+  public Column findColumn(String blockName, ColumnReferenceExpr columnRef) throws VerifyException {
+    if (columnRef.hasTableName()) {
+      return findColumnFromRelation(blockName, columnRef.getTableName(), columnRef.getName());
+    } else {
+      return suspectColumn(blockName, columnRef.getName());
+    }
+  }
+
+  /**
+   * This method tries to find one column with only column name.
+   * If it do not find any column corresponding to the given name, it tries to find the column from other blocks.
+   * If it finds two or more columns corresponding to the given name, it incurs @{link VerifyException}.
+   *
+   * @param blockName The block name is the first priority block used for searching a column corresponding to the name.
+   * @param name The column name to be found
+   * @return the found column
+   *
+   * @throws VerifyException If there are two or more found columns, the exception will be caused.
+   */
+  public Column suspectColumn(String blockName, String name) throws VerifyException {
+    List<Column> candidates = new ArrayList<Column>();
+    Column candidate;
+
+    // Try to find a column from the current query block
+    for (ScanNode rel : queryBlocks.get(blockName).getRelations()) {
+      candidate = findColumnFromRelationOp(rel, name);
+
+      if (candidate != null) {
+        if (!blockName.equals(LogicalPlan.ROOT_BLOCK)) {
+          try {
+            candidate = (Column) candidate.clone();
+          } catch (CloneNotSupportedException e) {
+            e.printStackTrace();
+          }
+          candidate.setName(rel.getTableId() + "." + candidate.getColumnName());
+        }
+        candidates.add(candidate);
+        if (candidates.size() > 0) {
+          break;
+        }
+      }
+    }
+
+    // if a column is not found, try to find the column from outer blocks.
+    if (candidates.isEmpty()) {
+      // for each block
+      Outer:
+      for (QueryBlock block : queryBlocks.values()) {
+        for (ScanNode rel : block.getRelations()) {
+          candidate = findColumnFromRelationOp(rel, name);
+
+          if (candidate != null) {
+            if (!blockName.equals(LogicalPlan.ROOT_BLOCK)) {
+              try {
+                candidate = (Column) candidate.clone();
+              } catch (CloneNotSupportedException e) {
+                e.printStackTrace();
+              }
+              candidate.setName(rel.getTableId() + "." + candidate.getColumnName());
+            }
+            candidates.add(candidate);
+            if (candidates.size() > 0)
+              break Outer;
+          }
+        }
+      }
+    }
+
+    if (candidates.isEmpty()) {
+      throw new VerifyException("ERROR: no such a column name "+ name);
+    } else  if (candidates.size() > 1) {
+      throw new VerifyException("ERROR: column name "+ name + " is ambiguous");
+    }
+
+    return candidates.get(0);
+  }
+
+  private Column findColumnFromRelationOp(ScanNode relation, String name) throws VerifyException {
+    Column candidate = relation.getTableSchema().getColumnByName(name);
+    if (candidate != null) {
+      try {
+        candidate = (Column) candidate.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new RuntimeException(e);
+      }
+      if (!isVirtualRelation(relation.getCanonicalName())) {
+        candidate.setName(relation.getTableId() + "." + name);
+      }
+
+      return candidate;
+    } else {
+      return null;
+    }
+  }
+
+  public static boolean isVirtualRelation(String relationName) {
+    return relationName.charAt(0) == VIRTUAL_TABLE_PREFIX;
+  }
+
+  public class QueryBlock {
+    private String blockName;
+    private LogicalNode blockRoot;
+    private Map<String, ScanNode> relations = new HashMap<String, ScanNode>();
+    private Projection projection;
+
+    private boolean resolvedGrouping = true;
+    private boolean hasGrouping;
+    private Projectable projectionNode;
+    private SelectionNode selectionNode;
+    private GroupbyNode groupingNode;
+    private Schema schema;
+
+    TargetListManager targetListManager;
+
+    public QueryBlock(String blockName) {
+      this.blockName = blockName;
+    }
+
+    public String getName() {
+      return blockName;
+    }
+
+    public boolean hasRoot() {
+      return blockRoot != null;
+    }
+
+    public void setRoot(LogicalNode blockRoot) {
+      this.blockRoot = blockRoot;
+      queryBlockByNode.put(blockRoot, this);
+    }
+
+    public LogicalNode getRoot() {
+      return blockRoot;
+    }
+
+    public boolean containRelation(String name) {
+      return relations.containsKey(name);
+    }
+
+    public void addRelation(ScanNode relation) {
+      relations.put(relation.getCanonicalName(), relation);
+    }
+
+    public ScanNode getRelation(String name) {
+      return relations.get(name);
+    }
+
+    public Collection<ScanNode> getRelations() {
+      return this.relations.values();
+    }
+
+    public void setSchema(Schema schema) {
+      this.schema = schema;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+
+    public boolean hasTableExpression() {
+      return this.relations.size() > 0;
+    }
+
+    public void setProjection(Projection projection) {
+      this.projection = projection;
+    }
+
+    public Projection getProjection() {
+      return this.projection;
+    }
+
+    public void setProjectionNode(Projectable node) {
+      this.projectionNode = node;
+    }
+
+    public Projectable getProjectionNode() {
+      return this.projectionNode;
+    }
+
+    public boolean isGroupingResolved() {
+      return this.resolvedGrouping;
+    }
+
+    public void resolveGrouping() {
+      this.resolvedGrouping = true;
+    }
+
+    public void setHasGrouping() {
+      hasGrouping = true;
+      resolvedGrouping = false;
+    }
+
+    public boolean hasGrouping() {
+      return hasGrouping || hasGroupingNode();
+    }
+
+    public boolean hasGroupingNode() {
+      return this.groupingNode != null;
+    }
+
+    public void setGroupingNode(GroupbyNode groupingNode) {
+      this.groupingNode = groupingNode;
+    }
+
+    public GroupbyNode getGroupingNode() {
+      return this.groupingNode;
+    }
+
+    public boolean hasSelectionNode() {
+      return this.selectionNode != null;
+    }
+
+    public void setSelectionNode(SelectionNode selectionNode) {
+      this.selectionNode = selectionNode;
+    }
+
+    public SelectionNode getSelectionNode() {
+      return selectionNode;
+    }
+
+    public String toString() {
+      return blockName;
+    }
+
+    public boolean isTargetEvaluated(int targetId) {
+      return targetListManager.isEvaluated(targetId);
+    }
+
+    public boolean canTargetEvaluated(int targetId, LogicalNode node) {
+      return (targetListManager.getTarget(targetId) != null) &&
+          PlannerUtil.canBeEvaluated(targetListManager.getTarget(targetId).getEvalTree(), node);
+    }
+
+    /**
+     * It requires node's default output schemas.
+     * @param node
+     */
+    public void checkAndSetEvaluatedTargets(LogicalNode node) {
+      if (!(node instanceof Projectable)) {
+        return;
+      }
+
+      // If all columns are projected and do not include any expression
+      if (projection.isAllProjected() && node instanceof ScanNode) {
+        targetListManager = new TargetListManager(LogicalPlan.this, PlannerUtil.schemaToTargets(node.getInSchema()));
+        targetListManager.setEvaluatedAll();
+
+      } else {
+
+        // fill a target if an annotated target can be created.
+        // Some targets which are based on multiple relations can be created only in a certain
+        // join node.
+        for (int i = 0; i < targetListManager.size(); i++) {
+          if (targetListManager.getTarget(i) == null) {
+            try {
+              targetListManager.updateTarget(i,planner.createTarget(LogicalPlan.this, blockName,
+                  projection.getTargets()[i]));
+            } catch (VerifyException e) {
+            }
+          }
+        }
+
+        // add target to list if a target can be evaluated at this node
+        List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
+        for (int i = 0; i < targetListManager.size(); i++) {
+
+          if (targetListManager.getTarget(i) != null && !isTargetEvaluated(i)) {
+            EvalNode expr = targetListManager.getTarget(i).getEvalTree();
+
+            if (canTargetEvaluated(i, node)) {
+
+              if (node instanceof ScanNode) { // for scan node
+                if (expr.getType() == EvalNode.Type.FIELD) {
+                  targetListManager.setEvaluated(i);
+                  if (targetListManager.getTarget(i).hasAlias()) {
+                    newEvaluatedTargetIds.add(i);
+                  }
+                } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+                  // if this expression does no contain any aggregation function
+                  targetListManager.setEvaluated(i);
+                  newEvaluatedTargetIds.add(i);
+                }
+
+              } else if (node instanceof GroupbyNode) { // for grouping
+                if (EvalTreeUtil.findDistinctAggFunction(expr).size() > 0) {
+                  targetListManager.setEvaluated(i);
+                  newEvaluatedTargetIds.add(i);
+                }
+
+              } else if (node instanceof JoinNode) { // for join
+                if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+                  // if this expression does no contain any aggregation function,
+                  targetListManager.setEvaluated(i);
+                  newEvaluatedTargetIds.add(i);
+                }
+              }
+            }
+          }
+        }
+
+        if (node instanceof ScanNode || node instanceof JoinNode) {
+          Schema baseSchema = null;
+          if (node instanceof ScanNode) {
+            baseSchema = ((ScanNode)node).getTableSchema();
+          } else if (node instanceof JoinNode) {
+            baseSchema = node.getInSchema();
+          }
+
+          if (newEvaluatedTargetIds.size() > 0) {
+            // fill addedTargets with output columns and new expression columns (e.g., aliased column or expressions)
+            Target [] addedTargets = new Target[baseSchema.getColumnNum() + newEvaluatedTargetIds.size()];
+            PlannerUtil.schemaToTargets(baseSchema, addedTargets);
+            int baseIdx = baseSchema.getColumnNum();
+            for (int i = 0; i < newEvaluatedTargetIds.size(); i++) {
+              addedTargets[baseIdx + i] = targetListManager.getTarget(newEvaluatedTargetIds.get(i));
+            }
+
+            // set targets to ScanNode because it needs to evaluate expressions
+            ((Projectable)node).setTargets(addedTargets);
+            // the output schema of ScanNode has to have the combination of the original output and newly-added targets.
+            node.setOutSchema(LogicalPlanner.getProjectedSchema(LogicalPlan.this, addedTargets));
+          } else {
+            node.setOutSchema(node.getInSchema());
+          }
+
+          // if newEvaluatedTargetIds == 0, the original output schema will be used.
+        } else if (node instanceof GroupbyNode) {
+          // Set the current targets to the GroupByNode because the GroupByNode is the last projection operator.
+          ((Projectable)node).setTargets(targetListManager.getTargets());
+          node.setOutSchema(targetListManager.getUpdatedSchema());
+        }
+      }
+
+      // replace the evaluated targets for upper operators
+      try {
+        targetListManager.getUpdatedTarget();
+      } catch (CloneNotSupportedException e) {
+        throw new InternalError(e.getMessage());
+      }
+
+      if (targetListManager.isAllEvaluated()) {
+        schema = targetListManager.getUpdatedSchema();
+      }
+    }
+
+    public TargetListManager getTargetListManager() {
+      return targetListManager;
+    }
+
+    public Target [] getCurrentTargets() {
+      return targetListManager.getTargets();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 8f05593..d8bb157 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -18,36 +18,39 @@
 
 package org.apache.tajo.engine.planner;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.function.AggFunction;
+import org.apache.tajo.catalog.function.GeneralFunction;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.parser.*;
-import org.apache.tajo.engine.parser.QueryBlock.*;
+import org.apache.tajo.engine.eval.EvalNode.Type;
+import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.logical.join.Edge;
-import org.apache.tajo.engine.planner.logical.join.JoinTree;
 import org.apache.tajo.engine.query.exception.InvalidQueryException;
 import org.apache.tajo.engine.query.exception.NotSupportQueryException;
+import org.apache.tajo.engine.query.exception.UndefinedFunctionException;
 import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.exception.InternalException;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.Stack;
 
+import static org.apache.tajo.algebra.Aggregation.GroupType;
+
 /**
- * This class creates a logical plan from a parse tree ({@link org.apache.tajo.engine.parser.QueryBlock})
- * generated by {@link org.apache.tajo.engine.parser.QueryAnalyzer}.
+ * This class creates a logical plan from a parse tree ({@link org.apache.tajo.engine.parser.SQLAnalyzer})
+ * generated by {@link org.apache.tajo.engine.parser.SQLAnalyzer}.
  *
- * @see org.apache.tajo.engine.parser.QueryBlock
+ * @see org.apache.tajo.engine.parser
  */
 public class LogicalPlanner {
   private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
@@ -60,358 +63,461 @@ public class LogicalPlanner {
   /**
    * This generates a logical plan.
    *
-   * @param context
-   * @return a initial logical plan
+   * @param expr A relational algebraic expression for a query.
+   * @return A logical plan
    */
-  public LogicalNode createPlan(PlanningContext context) {
-    LogicalNode plan;
+  public LogicalPlan createPlan(Expr expr) {
+
+    LogicalPlan plan = new LogicalPlan(this);
+    LogicalNode subroot = null;
 
+    Stack<ExprType> stack =
+        new Stack<ExprType>();
+
+    QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
     try {
-      plan = createPlanInternal(context, context.getParseTree());
+      subroot = createPlanInternal(plan, rootBlock, expr, stack);
     } catch (CloneNotSupportedException e) {
       throw new InvalidQueryException(e);
+    } catch (VerifyException e) {
+      e.printStackTrace();
     }
 
     LogicalRootNode root = new LogicalRootNode();
-    root.setInSchema(plan.getOutSchema());
-    root.setOutSchema(plan.getOutSchema());
-    root.setSubNode(plan);
-
-    return root;
-  }
-  
-  private LogicalNode createPlanInternal(PlanningContext ctx,
-      ParseTree query) throws CloneNotSupportedException {
-    LogicalNode plan;
-    
-    switch(query.getType()) {
-    case SELECT:
-      LOG.info("Planning select statement");
-      QueryBlock select = (QueryBlock) query;
-      plan = buildSelectPlan(ctx, select);
-      break;
-      
-    case UNION:
-    case EXCEPT:
-    case INTERSECT:
-      SetStmt set = (SetStmt) query;
-      plan = buildSetPlan(ctx, set);
-      break;
-      
-    case CREATE_INDEX:
-      LOG.info("Planning create index statement");
-      CreateIndexStmt createIndex = (CreateIndexStmt) query;
-      plan = buildCreateIndexPlan(createIndex);
-      break;
-
-    case CREATE_TABLE:
-    case CREATE_TABLE_AS:
-      LOG.info("Planning store statement");
-      CreateTableStmt createTable = (CreateTableStmt) query;
-      plan = buildCreateTablePlan(ctx, createTable);
-      break;
-
-    default:
-      throw new NotSupportQueryException(query.toString());
-    }
-    
+    root.setInSchema(subroot.getOutSchema());
+    root.setOutSchema(subroot.getOutSchema());
+    root.setSubNode(subroot);
+    plan.getRootBlock().setRoot(root);
+
     return plan;
   }
 
-  private LogicalNode buildSetPlan(PlanningContext ctx, SetStmt stmt)
-      throws CloneNotSupportedException {
-    BinaryNode bin;
-    switch (stmt.getType()) {
-    case UNION:
-      bin = new UnionNode();
-      break;
-    case EXCEPT:
-      bin = new ExceptNode();
-      break;
-    case INTERSECT:
-      bin = new IntersectNode();
-      break;
-    default:
-      throw new IllegalStateException("the statement cannot be matched to any set operation type");
-    }
-    
-    bin.setOuter(createPlanInternal(ctx, stmt.getLeftTree()));
-    bin.setInner(createPlanInternal(ctx, stmt.getRightTree()));
-    bin.setInSchema(bin.getOuterNode().getOutSchema());
-    bin.setOutSchema(bin.getOuterNode().getOutSchema());
-    return bin;
-  }
-
-  private LogicalNode buildCreateIndexPlan(CreateIndexStmt stmt) {
-    FromTable table = new FromTable(catalog.getTableDesc(stmt.getTableName()));
-    ScanNode scan = new ScanNode(table);
-    scan.setInSchema(table.getSchema());
-    scan.setOutSchema(table.getSchema());
-    IndexWriteNode indexWrite = new IndexWriteNode(stmt);
-    indexWrite.setSubNode(scan);
-    indexWrite.setInSchema(scan.getOutSchema());
-    indexWrite.setOutSchema(scan.getOutSchema());
-    
-    return indexWrite;
-  }
-
-  private static LogicalNode buildCreateTablePlan(final PlanningContext ctx,
-                                                  final CreateTableStmt query)
-      throws CloneNotSupportedException {
-    LogicalNode node;
+  /**
+   * Relational operators can be divided into two categories as follows:
+   * <oi>
+   *  <li>General operator: this type operators do not affect tuple schema and do not evaluate
+   *  expressions. Selection, Sort, and Limit belong to this operator.</li>
+   *  <li>Projectable operator: this type operators affects tuple schema and evaluate expressions.
+   *  Scan, Groupby, and Join belong to this operators.
+   *  </li>
+   * </oi>
+   */
+  private LogicalNode createPlanInternal(LogicalPlan plan, QueryBlock block, Expr expr, Stack<ExprType> stack)
+      throws CloneNotSupportedException, VerifyException {
+    LogicalNode currentNode;
+    QueryBlock currentBlock = checkNewBlockAndGet(plan, block.getName());
 
-    if (query.hasQueryBlock()) {
-      LogicalNode selectPlan = buildSelectPlan(ctx, query.getSelectStmt());
-      StoreTableNode storeNode = new StoreTableNode(query.getTableName());
+    switch(expr.getType()) {
+      case Projection:
+        Projection projection = (Projection) expr;
+        currentNode = buildProjectionNode(plan, currentBlock, projection, stack);
+        break;
 
-      storeNode.setSubNode(selectPlan);
+      case Filter:
+        Selection selection = (Selection) expr;
+        currentNode = buildSelectionNode(plan, currentBlock, selection, stack);
+        break;
 
-      if (query.hasDefinition()) {
-        storeNode.setOutSchema(query.getTableDef());
-      } else {
-        // TODO - strip qualified name
-        storeNode.setOutSchema(selectPlan.getOutSchema());
-      }
-      storeNode.setInSchema(selectPlan.getOutSchema());
+      case Aggregation:
+        Aggregation aggregation = (Aggregation) expr;
+        currentNode = buildGroupingPlan(plan, currentBlock, aggregation, stack);
+        break;
 
-      if (query.hasStoreType()) {
-        storeNode.setStorageType(query.getStoreType());
-      } else {
-        // default type
-        // TODO - it should be configurable.
-        storeNode.setStorageType(CatalogProtos.StoreType.CSV);
-      }
-      if (query.hasOptions()) {
-        storeNode.setOptions(query.getOptions());
-      }
+      case Join:
+        Join join = (Join) expr;
+        currentNode = buildExplicitJoinPlan(plan, currentBlock, join, stack);
+        break;
 
-      node = storeNode;
-    } else {
-      CreateTableNode createTable =
-          new CreateTableNode(query.getTableName(), query.getTableDef());
+      case Sort:
+        Sort sort = (Sort) expr;
+        currentNode = buildSortPlan(plan, currentBlock, sort, stack);
+        break;
 
-      if (query.hasStoreType()) {
-        createTable.setStorageType(query.getStoreType());
-      } else {
-        // default type
-        // TODO - it should be configurable.
-        createTable.setStorageType(CatalogProtos.StoreType.CSV);
-      }
-      if (query.hasOptions()) {
-        createTable.setOptions(query.getOptions());
-      }
+      case Limit:
+        Limit limit = (Limit) expr;
+        currentNode = buildLimitPlan(plan, currentBlock, limit, stack);
+        break;
 
-      if (query.hasPath()) {
-        createTable.setPath(query.getPath());
-      }
-      createTable.setExternal(query.isExternal());
-      node = createTable;
+      case Union:
+      case Except:
+      case Intersect:
+        SetOperation setOp = (SetOperation) expr;
+        currentNode = buildSetPlan(plan, currentBlock, setOp, stack);
+        break;
+
+
+      case RelationList:
+        currentNode = buildRelationListPlan(plan, currentBlock, (RelationList) expr, stack);
+        if (((RelationList) expr).size() == 1) { // skip visitPost because it is already visited
+          return currentNode;
+        }
+        break;
+
+      case Relation:
+        currentNode = buildScanPlan(currentBlock, expr);
+        break;
+
+      case CreateTable:
+        CreateTable createTable = (CreateTable) expr;
+        currentNode = buildCreateTable(plan, currentBlock, createTable, stack);
+        break;
+
+      case DropTable:
+        DropTable dropTable = (DropTable) expr;
+        currentNode = buildDropTable(dropTable);
+        break;
+
+      default:
+        throw new NotSupportQueryException(expr.getType().name());
     }
-    
-    return node;
+
+    // mark the node as the visited node and do post work for each operator
+    plan.postVisit(block.getName(), currentNode, stack);
+    // check and set evaluated targets and update in/out schemas
+    currentBlock.checkAndSetEvaluatedTargets(currentNode);
+    return currentNode;
   }
-  
+
   /**
-   * ^(SELECT from_clause? where_clause? groupby_clause? selectList)
-   * 
-   * @param query
-   * @return the planed logical plan
+   * It checks if the first node in this query block. If not, it creates and adds a new query block.
+   * In addition, it always returns the query block corresponding to the block name.
    */
-  private static LogicalNode buildSelectPlan(PlanningContext ctx,
-                                             QueryBlock query)
-      throws CloneNotSupportedException {
-    LogicalNode subroot;
-    EvalNode whereCondition = null;
-    EvalNode [] cnf = null;
-    if(query.hasWhereClause()) {
-      whereCondition = query.getWhereCondition();
-      whereCondition = AlgebraicUtil.simplify(whereCondition);
-      cnf = EvalTreeUtil.getConjNormalForm(whereCondition);
-    }
-
-    if(query.hasFromClause()) {
-      if (query.hasExplicitJoinClause()) {
-        subroot = createExplicitJoinTree(query);
-      } else {
-        subroot = createImplicitJoinTree(query.getFromTables(), cnf);
-      }
+  private QueryBlock checkNewBlockAndGet(LogicalPlan plan, String blockName) {
+    QueryBlock block = plan.getBlock(blockName);
+    if (block == null) {
+      return plan.newAndGetBlock(blockName);
     } else {
-      subroot = new EvalExprNode(query.getTargetList());
-      subroot.setOutSchema(getProjectedSchema(ctx, query.getTargetList()));
-      return subroot;
-    }
-    
-    if(whereCondition != null) {
-      SelectionNode selNode = 
-          new SelectionNode(query.getWhereCondition());
-      selNode.setSubNode(subroot);
-      selNode.setInSchema(subroot.getOutSchema());
-      selNode.setOutSchema(selNode.getInSchema());
-      subroot = selNode;
-    }
-    
-    if(query.hasAggregation()) {
-      if (query.isDistinct()) {
-        throw new InvalidQueryException("Cannot support GROUP BY queries with distinct keyword");
+      return block;
+    }
+  }
+
+  private ScanNode buildScanPlan(QueryBlock block, Expr expr)
+      throws VerifyException {
+    // 1. init phase
+
+    // 2. build child plans
+    // 3. build scan plan
+    Relation relation = (Relation) expr;
+    TableDesc desc = catalog.getTableDesc(relation.getName());
+    FromTable fromTable = new FromTable(desc);
+
+    if (relation.hasAlias()) {
+      fromTable.setAlias(relation.getAlias());
+    }
+
+    ScanNode scanNode = new ScanNode(fromTable);
+
+    return scanNode;
+  }
+
+  /*===============================================================================================
+    JOIN SECTION
+   ===============================================================================================*/
+  private LogicalNode buildRelationListPlan(LogicalPlan plan, QueryBlock block,
+                                            RelationList relations, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+
+    LogicalNode current = createPlanInternal(plan, block, relations.getRelations()[0], stack);
+
+    LogicalNode left;
+    LogicalNode right;
+    if (relations.size() > 1) {
+
+      for (int i = 1; i < relations.size(); i++) {
+        left = current;
+        right = createPlanInternal(plan, block, relations.getRelations()[i], stack);
+        current = createCatasianProduct(left, right);
       }
+    }
 
-      GroupbyNode groupbyNode = null;
-      if (query.hasGroupbyClause()) {
-        if (query.getGroupByClause().getGroupSet().get(0).getType() == GroupType.GROUPBY) {          
-          groupbyNode = new GroupbyNode(query.getGroupByClause().getGroupSet().get(0).getColumns());
-          groupbyNode.setTargetList(query.getTargetList());
-          groupbyNode.setSubNode(subroot);
-          groupbyNode.setInSchema(subroot.getOutSchema());
-          Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
-          groupbyNode.setOutSchema(outSchema);
-          subroot = groupbyNode;
-        } else if (query.getGroupByClause().getGroupSet().get(0).getType() == GroupType.CUBE) {
-          LogicalNode union = createGroupByUnionByCube(ctx, query,
-              subroot, query.getGroupByClause());
-          Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
-          union.setOutSchema(outSchema);
-          subroot = union;
-        }
-        if(query.hasHavingCond())
-          groupbyNode.setHavingCondition(query.getHavingCond());
+    return current;
+  }
+
+  private LogicalNode buildExplicitJoinPlan(LogicalPlan plan, QueryBlock block, Join join, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+    // Phase 1: Init
+    // Phase 2: build child plans
+    stack.push(ExprType.JOIN);
+    LogicalNode left = createPlanInternal(plan, block, join.getLeft(), stack);
+    LogicalNode right = createPlanInternal(plan, block, join.getRight(), stack);
+    stack.pop();
+
+    // Phase 3: build this plan
+    JoinNode joinNode = new JoinNode(join.getJoinType(), left, right);
+
+    // Set A merged input schema
+    Schema merged;
+    if (join.isNatural()) {
+      merged = getNaturalJoin(left, right);
+    } else {
+      merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    }
+    joinNode.setInSchema(merged);
+    joinNode.setOutSchema(merged);
+
+    // Determine join conditions
+    if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
+      Schema leftSchema = joinNode.getOuterNode().getInSchema();
+      Schema rightSchema = joinNode.getInnerNode().getInSchema();
+      Schema commons = SchemaUtil.getCommons(leftSchema, rightSchema);
+      EvalNode njCond = getNaturalJoinCondition(leftSchema, rightSchema, commons);
+      joinNode.setJoinQual(njCond);
+    } else if (join.hasQual()) { // otherwise, the given join conditions are set
+      joinNode.setJoinQual(createEvalTree(plan, block.getName(), join.getQual()));
+    }
+
+    return joinNode;
+  }
+
+  private static EvalNode getNaturalJoinCondition(Schema outer, Schema inner, Schema commons) {
+    EvalNode njQual = null;
+    EvalNode equiQual;
+
+    Column leftJoinKey;
+    Column rightJoinKey;
+    for (Column common : commons.getColumns()) {
+      leftJoinKey = outer.getColumnByName(common.getColumnName());
+      rightJoinKey = inner.getColumnByName(common.getColumnName());
+      equiQual = new BinaryEval(EvalNode.Type.EQUAL,
+          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
+      if (njQual == null) {
+        njQual = equiQual;
       } else {
-        // when aggregation functions are used without grouping fields
-        groupbyNode = new GroupbyNode(new Column[] {});
-        groupbyNode.setTargetList(query.getTargetList());
-        groupbyNode.setSubNode(subroot);
-        groupbyNode.setInSchema(subroot.getOutSchema());
-        Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
-        groupbyNode.setOutSchema(outSchema);
-        subroot = groupbyNode;
+        njQual = new BinaryEval(EvalNode.Type.AND,
+            njQual, equiQual);
       }
     }
-    
-    if(query.hasOrderByClause()) {
-      SortNode sortNode = new SortNode(query.getSortKeys());
-      sortNode.setSubNode(subroot);
-      sortNode.setInSchema(subroot.getOutSchema());
-      sortNode.setOutSchema(sortNode.getInSchema());
-      subroot = sortNode;
-    }
-
-    ProjectionNode prjNode;
-    if (query.getProjectAll()) {
-      Schema merged = SchemaUtil.merge(query.getFromTables());
-      Target [] allTargets = PlannerUtil.schemaToTargets(merged);
-      prjNode = new ProjectionNode(allTargets);
-      prjNode.setSubNode(subroot);
-      prjNode.setInSchema(merged);
-      prjNode.setOutSchema(merged);
-      subroot = prjNode;
-      query.setTargetList(allTargets);
-    } else {
-      prjNode = new ProjectionNode(query.getTargetList());
-      if (subroot != null) { // false if 'no from' statement
-        prjNode.setSubNode(subroot);
+
+    return njQual;
+  }
+
+  private static LogicalNode createCatasianProduct(LogicalNode left, LogicalNode right) {
+    JoinNode join = new JoinNode(JoinType.CROSS_JOIN, left, right);
+    Schema joinSchema = SchemaUtil.merge(
+        join.getOuterNode().getOutSchema(),
+        join.getInnerNode().getOutSchema());
+    join.setInSchema(joinSchema);
+    join.setOutSchema(joinSchema);
+
+    return join;
+  }
+
+  private static Schema getNaturalJoin(LogicalNode outer, LogicalNode inner) {
+    Schema joinSchema = new Schema();
+    Schema commons = SchemaUtil.getCommons(outer.getOutSchema(),
+        inner.getOutSchema());
+    joinSchema.addColumns(commons);
+    for (Column c : outer.getOutSchema().getColumns()) {
+      for (Column common : commons.getColumns()) {
+        if (!common.getColumnName().equals(c.getColumnName())) {
+          joinSchema.addColumn(c);
+        }
       }
-      prjNode.setInSchema(subroot.getOutSchema());
-
-      // All aggregate functions are evaluated before the projection.
-      // So, the targets for aggregate functions should be updated.
-      LogicalOptimizer.TargetListManager tlm = new LogicalOptimizer.
-          TargetListManager(ctx, query.getTargetList());
-      for (int i = 0; i < tlm.getTargets().length; i++) {
-        if (EvalTreeUtil.findDistinctAggFunction(tlm.getTarget(i).getEvalTree()).size() > 0) {
-          tlm.setEvaluated(i);
+    }
+
+    for (Column c : inner.getOutSchema().getColumns()) {
+      for (Column common : commons.getColumns()) {
+        if (!common.getColumnName().equals(c.getColumnName())) {
+          joinSchema.addColumn(c);
         }
       }
-      prjNode.setTargetList(tlm.getUpdatedTarget());
-      Schema projected = getProjectedSchema(ctx, tlm.getUpdatedTarget());
-      prjNode.setOutSchema(projected);
-      subroot = prjNode;
     }
+    return joinSchema;
+  }
+
+  /*===============================================================================================
+    SET OPERATION SECTION
+   ===============================================================================================*/
+  private LogicalNode buildSetPlan(LogicalPlan plan, QueryBlock block, SetOperation setOperation,
+                                   Stack<ExprType> stack) throws VerifyException, CloneNotSupportedException {
+    LogicalNode left;
+    LogicalNode right;
 
-    GroupbyNode dupRemoval;
-    if (query.isDistinct()) {
-      dupRemoval = new GroupbyNode(subroot.getOutSchema().toArray());
-      dupRemoval.setTargetList(query.getTargetList());
-      dupRemoval.setSubNode(subroot);
-      dupRemoval.setInSchema(subroot.getOutSchema());
-      Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
-      dupRemoval.setOutSchema(outSchema);
-      subroot = dupRemoval;
+    QueryBlock leftBlock = plan.newAnonymousBlock();
+    Stack<ExprType> leftStack = new Stack<ExprType>();
+    left = createPlanInternal(plan, leftBlock, setOperation.getLeft(), leftStack);
+    Stack<ExprType> rightStack = new Stack<ExprType>();
+    QueryBlock rightBlock = plan.newAnonymousBlock();
+    right = createPlanInternal(plan, rightBlock, setOperation.getRight(), rightStack);
+
+    verifySetStatement(setOperation.getType(), leftBlock, rightBlock);
+
+    BinaryNode setOp;
+    if (setOperation.getType() == OpType.Union) {
+      setOp = new UnionNode(left, right);
+    } else if (setOperation.getType() == OpType.Except) {
+      setOp = new ExceptNode(left, right);
+    } else if (setOperation.getType() == OpType.Intersect) {
+      setOp = new IntersectNode(left, right);
+    } else {
+      throw new VerifyException(setOperation.toJson());
     }
 
-    if (query.hasLimitClause()) {
-      LimitNode limitNode = new LimitNode(query.getLimitClause());
-      limitNode.setSubNode(subroot);
-      limitNode.setInSchema(subroot.getOutSchema());
-      limitNode.setOutSchema(limitNode.getInSchema());
-      subroot = limitNode;
+    // Strip the table names from the targets of the both blocks
+    // in order to check the equivalence the schemas of both blocks.
+    Target [] leftStrippedTargets = PlannerUtil.stripTarget(leftBlock.getCurrentTargets());
+
+    Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
+    setOp.setInSchema(left.getOutSchema());
+    setOp.setOutSchema(outSchema);
+    setOp.setOuter(left);
+    setOp.setInner(right);
+
+    if (isNoUpperProjection(stack)) {
+//      ProjectionNode projectionNode = new ProjectionNode(leftStrippedTargets);
+//      projectionNode.setSubNode(setOp);
+
+//      localBlock.setProjectionNode(projectionNode);
+      block.targetListManager = new TargetListManager(plan, leftStrippedTargets);
+      block.targetListManager.setEvaluatedAll();
+      block.targetListManager.getUpdatedTarget();
+      block.setSchema(block.targetListManager.getUpdatedSchema());
     }
-    
-    return subroot;
+
+    return setOp;
   }
 
-  public static LogicalNode createGroupByUnionByCube(
-      final PlanningContext context,
-      final QueryBlock queryBlock,
-      final LogicalNode subNode,
-      final GroupByClause clause) {
+  private boolean verifySetStatement(OpType type, QueryBlock left, QueryBlock right)
+      throws VerifyException {
 
-    GroupElement element = clause.getGroupSet().get(0);
+    if (left.getCurrentTargets().length != right.getCurrentTargets().length) {
+      throw new VerifyException("ERROR: each " + type.name() + " query must have the same number of columns");
+    }
 
-    List<Column []> cuboids  = generateCuboids(element.getColumns());
+    Target [] targets1 = left.getCurrentTargets();
+    Target [] targets2 = right.getCurrentTargets();
 
-    return createGroupByUnion(context, queryBlock, subNode, cuboids, 0);
+    for (int i = 0; i < targets1.length; i++) {
+      if (!targets1[i].getDataType().equals(targets2[i].getDataType())) {
+        throw new VerifyException("UNION types " + targets1[i].getDataType().getType() + " and "
+            + targets2[i].getDataType().getType() + " cannot be matched");
+      }
+    }
+
+    return true;
   }
 
-  private static Target [] cloneTargets(Target [] srcs)
-      throws CloneNotSupportedException {
-    Target [] clone = new Target[srcs.length];
-    for (int i = 0; i < srcs.length; i++) {
-      clone[i] = (Target) srcs[i].clone();
-    }
+  private SelectionNode buildSelectionNode(LogicalPlan plan, QueryBlock block, Selection selection,
+                                           Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+    // 1. init phase:
 
-    return clone;
+    // 2. build child plans:
+    stack.push(ExprType.SELECTION);
+    LogicalNode child = createPlanInternal(plan, block, selection.getChild(), stack);
+    stack.pop();
+
+    // 3. build this plan:
+    EvalNode searchCondition = createEvalTree(plan, block.getName(), selection.getQual());
+    SelectionNode selectionNode = new SelectionNode(searchCondition);
+
+    // 4. set child plan, update input/output schemas:
+    selectionNode.setSubNode(child);
+    selectionNode.setInSchema(child.getOutSchema());
+    selectionNode.setOutSchema(child.getOutSchema());
+
+    // 5. update block information:
+    block.setSelectionNode(selectionNode);
+
+    return selectionNode;
+  }
+
+  /*===============================================================================================
+    GROUP BY SECTION
+   ===============================================================================================*/
+
+  private LogicalNode buildGroupingPlan(LogicalPlan plan, QueryBlock block, Aggregation aggregation,
+                                        Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+
+    // 1. Initialization Phase:
+
+    // 2. Build Child Plan Phase:
+    stack.push(ExprType.GROUP_BY);
+    LogicalNode child = createPlanInternal(plan, block, aggregation.getChild(), stack);
+    stack.pop();
+
+    // 3. Build This Plan:
+    Aggregation.GroupElement [] groupElements = aggregation.getGroupSet();
+
+    if (groupElements[0].getType() == GroupType.OrdinaryGroup) { // for group-by
+      GroupElement annotatedElements [] = new GroupElement[groupElements.length];
+      for (int i = 0; i < groupElements.length; i++) {
+        annotatedElements[i] = new GroupElement(
+            groupElements[i].getType(),
+            annotateGroupingColumn(plan, block.getName(), groupElements[i].getColumns(), child));
+      }
+      GroupbyNode groupingNode = new GroupbyNode(annotatedElements[0].getColumns());
+      if (aggregation.hasHavingCondition()) {
+        groupingNode.setHavingCondition(
+            createEvalTree(plan, block.getName(), aggregation.getHavingCondition()));
+      }
+
+      // 4. Set Child Plan and Update Input Schemes Phase
+      groupingNode.setSubNode(child);
+      block.setGroupingNode(groupingNode);
+      groupingNode.setInSchema(child.getInSchema());
+
+      // 5. Update Output Schema and Targets for Upper Plan
+
+      return groupingNode;
+
+    } else if (groupElements[0].getType() == GroupType.Cube) { // for cube by
+      List<Column []> cuboids  = generateCuboids(annotateGroupingColumn(plan, block.getName(),
+          groupElements[0].getColumns(), child));
+      UnionNode topUnion = createGroupByUnion(plan, block, child, cuboids, 0);
+      block.resolveGrouping();
+      block.getTargetListManager().setEvaluatedAll();
+
+      return topUnion;
+    } else {
+      throw new InvalidQueryException("Not support grouping");
+    }
   }
 
-  private static UnionNode createGroupByUnion(final PlanningContext context,
-                                              final QueryBlock queryBlock,
-                                              final LogicalNode subNode,
-                                              final List<Column []> cuboids,
-                                              final int idx) {
+  private UnionNode createGroupByUnion(final LogicalPlan plan,
+                                       final QueryBlock block,
+                                       final LogicalNode subNode,
+                                       final List<Column []> cuboids,
+                                       final int idx) {
     UnionNode union;
     try {
       if ((cuboids.size() - idx) > 2) {
         GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
-        Target [] clone = cloneTargets(queryBlock.getTargetList());
+        Target [] clone = cloneTargets(block.getCurrentTargets());
 
-        g1.setTargetList(clone);
+        g1.setTargets(clone);
         g1.setSubNode((LogicalNode) subNode.clone());
         g1.setInSchema(g1.getSubNode().getOutSchema());
-        Schema outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        Schema outSchema = getProjectedSchema(plan, block.getCurrentTargets());
         g1.setOutSchema(outSchema);
 
-        union = new UnionNode(g1, createGroupByUnion(context, queryBlock,
-            subNode, cuboids, idx+1));
+        LogicalNode right = createGroupByUnion(plan, block, subNode, cuboids, idx+1);
+        union = new UnionNode(g1, right);
         union.setInSchema(g1.getOutSchema());
         union.setOutSchema(g1.getOutSchema());
+
         return union;
       } else {
         GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
-        Target [] clone = cloneTargets(queryBlock.getTargetList());
-        g1.setTargetList(clone);
+        Target [] clone = cloneTargets(block.getCurrentTargets());
+        g1.setTargets(clone);
         g1.setSubNode((LogicalNode) subNode.clone());
         g1.setInSchema(g1.getSubNode().getOutSchema());
-        Schema outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        Schema outSchema = getProjectedSchema(plan, clone);
         g1.setOutSchema(outSchema);
 
         GroupbyNode g2 = new GroupbyNode(cuboids.get(idx+1));
-        clone = cloneTargets(queryBlock.getTargetList());
-        g2.setTargetList(clone);
+        clone = cloneTargets(block.getCurrentTargets());
+        g2.setTargets(clone);
         g2.setSubNode((LogicalNode) subNode.clone());
         g2.setInSchema(g1.getSubNode().getOutSchema());
-        outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        outSchema = getProjectedSchema(plan, clone);
         g2.setOutSchema(outSchema);
         union = new UnionNode(g1, g2);
         union.setInSchema(g1.getOutSchema());
         union.setOutSchema(g1.getOutSchema());
+
         return union;
       }
     } catch (CloneNotSupportedException cnse) {
@@ -419,17 +525,40 @@ public class LogicalPlanner {
       throw new InvalidQueryException(cnse);
     }
   }
-  
-  public static final Column [] ALL 
-    = Lists.newArrayList().toArray(new Column[0]);
-  
+
+  /**
+   * It transforms a list of column references into a list of annotated columns with considering aliased expressions.
+   */
+  private Column [] annotateGroupingColumn(LogicalPlan plan, String blockName,
+                                           ColumnReferenceExpr[] columnRefs, LogicalNode child)
+      throws VerifyException {
+    Column [] columns = new Column[columnRefs.length];
+    for (int i = 0; i < columnRefs.length; i++) {
+      columns[i] = plan.findColumnFromChildNode(columnRefs[i], blockName, child);
+    }
+
+    return columns;
+  }
+
+  private static Target [] cloneTargets(Target [] sourceTargets)
+      throws CloneNotSupportedException {
+    Target [] clone = new Target[sourceTargets.length];
+    for (int i = 0; i < sourceTargets.length; i++) {
+      clone[i] = (Target) sourceTargets[i].clone();
+    }
+
+    return clone;
+  }
+
+  public static final Column [] ALL= Lists.newArrayList().toArray(new Column[0]);
+
   public static List<Column []> generateCuboids(Column [] columns) {
     int numCuboids = (int) Math.pow(2, columns.length);
-    int maxBits = columns.length;    
-    
+    int maxBits = columns.length;
+
     List<Column []> cube = Lists.newArrayList();
     List<Column> cuboidCols;
-    
+
     cube.add(ALL);
     for (int cuboidId = 1; cuboidId < numCuboids; cuboidId++) {
       cuboidCols = Lists.newArrayList();
@@ -444,207 +573,448 @@ public class LogicalPlanner {
     return cube;
   }
 
-  private static LogicalNode createExplicitJoinTree(QueryBlock block) {
-    return createExplicitJoinTree_(block.getJoinClause());
+  /*===============================================================================================
+    SORT SECTION
+   ===============================================================================================*/
+
+  private SortNode buildSortPlan(LogicalPlan plan, QueryBlock block, Sort sort, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+
+    // 1. Initialization Phase:
+    // 2. Build Child Plans:
+    stack.push(ExprType.SORT);
+    LogicalNode child = createPlanInternal(plan, block, sort.getChild(), stack);
+    child = insertGroupingIfUnresolved(plan, block.getName(), child, stack);
+    stack.pop();
+
+    // 3. Build this plan:
+    SortSpec [] annotatedSortSpecs = new SortSpec[sort.getSortSpecs().length];
+    Column column;
+    Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
+    for (int i = 0; i < sort.getSortSpecs().length; i++) {
+      column = plan.findColumnFromChildNode(sortSpecs[i].getKey(), block.getName(), child);
+      annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(),
+          sortSpecs[i].isNullFirst());
+    }
+    SortNode sortNode = new SortNode(annotatedSortSpecs);
+
+    // 4. Set Child Plan, Update Input/Output Schemas:
+    sortNode.setSubNode(child);
+    sortNode.setInSchema(child.getOutSchema());
+    sortNode.setOutSchema(child.getOutSchema());
+
+    return sortNode;
+  }
+
+  private LimitNode buildLimitPlan(LogicalPlan plan, QueryBlock block, Limit limit, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+    // build child plans
+    stack.push(ExprType.LIMIT);
+    LogicalNode child = createPlanInternal(plan, block, limit.getChild(), stack);
+    stack.pop();
+
+    // build limit plan
+    EvalNode firstFetchNum = createEvalTree(plan, block.getName(), limit.getFetchFirstNum());
+    firstFetchNum.eval(null, null, null);
+    LimitNode limitNode = new LimitNode(firstFetchNum.terminate(null).asInt8());
+
+    // set child plan and update input/output schemas.
+    limitNode.setSubNode(child);
+    limitNode.setInSchema(child.getOutSchema());
+    limitNode.setOutSchema(child.getOutSchema());
+    return limitNode;
   }
-  
-  private static LogicalNode createExplicitJoinTree_(JoinClause joinClause) {
 
-    JoinNode join;
-    if (joinClause.hasLeftJoin()) {
-      LogicalNode outer = createExplicitJoinTree_(joinClause.getLeftJoin());
-      join = new JoinNode(joinClause.getJoinType(), outer);
-      join.setInner(new ScanNode(joinClause.getRight()));
-    } else {
-      join = new JoinNode(joinClause.getJoinType(), new ScanNode(joinClause.getLeft()),
-          new ScanNode(joinClause.getRight()));
+  /*===============================================================================================
+    PROJECTION SECTION
+   ===============================================================================================*/
+
+  private LogicalNode buildProjectionNode(LogicalPlan plan, QueryBlock block,
+                                          Projection projection, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+
+    //1: init Phase
+    block.setProjection(projection);
+    if (!projection.isAllProjected()) {
+      block.targetListManager = new TargetListManager(plan, projection.size());
     }
-    if (joinClause.hasJoinQual()) {
-      join.setJoinQual(joinClause.getJoinQual());
-    } else if (joinClause.hasJoinColumns()) { 
-      // for using clause of explicit join
-      // TODO - to be implemented. Now, tajo only support 'ON' join clause.
+
+    if (!projection.hasChild()) {
+      EvalExprNode evalOnly =
+          new EvalExprNode(annotateTargets(plan, block.getName(), projection.getTargets()));
+      evalOnly.setOutSchema(getProjectedSchema(plan, evalOnly.getExprs()));
+      block.setProjectionNode(evalOnly);
+      for (int i = 0; i < evalOnly.getTargets().length; i++) {
+        block.targetListManager.updateTarget(i, evalOnly.getTargets()[i]);
+      }
+      return evalOnly;
     }
-    
-    // Determine Join Schemas
-    Schema merged;
-    if (joinClause.isNatural()) {
-      merged = getNaturalJoin(join.getOuterNode(), join.getInnerNode());
+
+    // 2: Build Child Plans
+    stack.push(ExprType.PROJECTION);
+    LogicalNode child = createPlanInternal(plan, block, projection.getChild(), stack);
+    child = insertGroupingIfUnresolved(plan, block.getName(), child, stack);
+    stack.pop();
+
+    // All targets must be evaluable before the projection.
+    Preconditions.checkState(block.getTargetListManager().isAllEvaluated(),
+        "Some targets cannot be evaluated in the query block \"%s\"", block.getName());
+
+    ProjectionNode projectionNode;
+    if (projection.isAllProjected()) {
+      projectionNode = new ProjectionNode(PlannerUtil.schemaToTargets(child.getOutSchema()));
     } else {
-      merged = SchemaUtil.merge(join.getOuterNode().getOutSchema(),
-          join.getInnerNode().getOutSchema());
-    }
-    
-    join.setInSchema(merged);
-    join.setOutSchema(merged);
-    
-    // Determine join quals
-    // if natural join, should have the equi join conditions on common columns
-    if (joinClause.isNatural()) {
-      Schema leftSchema = join.getOuterNode().getOutSchema();
-      Schema rightSchema = join.getInnerNode().getOutSchema();
-      Schema commons = SchemaUtil.getCommons(
-          leftSchema, rightSchema);
-      EvalNode njCond = getNaturalJoinCondition(leftSchema, rightSchema, commons);
-      join.setJoinQual(njCond);
-    } else if (joinClause.hasJoinQual()) { 
-      // otherwise, the given join conditions are set
-      join.setJoinQual(joinClause.getJoinQual());
+      projectionNode = new ProjectionNode(block.getCurrentTargets());
     }
-    
-    return join;
-  }
-  
-  private static EvalNode getNaturalJoinCondition(Schema outer, Schema inner, Schema commons) {
-    EvalNode njQual = null;
-    EvalNode equiQual;
-    
-    Column leftJoinKey;
-    Column rightJoinKey;
-    for (Column common : commons.getColumns()) {
-      leftJoinKey = outer.getColumnByName(common.getColumnName());
-      rightJoinKey = inner.getColumnByName(common.getColumnName());
-      equiQual = new BinaryEval(EvalNode.Type.EQUAL,
-          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
-      if (njQual == null) {
-        njQual = equiQual;
-      } else {
-        njQual = new BinaryEval(EvalNode.Type.AND,
-            njQual, equiQual);
+
+    block.setProjectionNode(projectionNode);
+    projectionNode.setOutSchema(getProjectedSchema(plan, projectionNode.getTargets()));
+    projectionNode.setInSchema(child.getOutSchema());
+    projectionNode.setSubNode(child);
+
+    if (projection.isDistinct() && block.hasGrouping()) {
+      throw new VerifyException("Cannot support grouping and distinct at the same time");
+    } else {
+      if (projection.isDistinct()) {
+        Schema outSchema = projectionNode.getOutSchema();
+        GroupbyNode dupRemoval = new GroupbyNode(outSchema.toArray());
+        dupRemoval.setTargets(block.getTargetListManager().getTargets());
+        dupRemoval.setInSchema(child.getOutSchema());
+        dupRemoval.setOutSchema(outSchema);
+        dupRemoval.setSubNode(child);
+        projectionNode.setSubNode(dupRemoval);
       }
     }
-    
-    return njQual;
+
+    return projectionNode;
   }
-  
-  private static LogicalNode createImplicitJoinTree(FromTable [] tables,
-                                                    EvalNode [] cnf) {
-    if (cnf == null) {
-      return createCatasianProduct(tables);
+
+  /**
+   * Insert a group-by operator before a sort or a projection operator.
+   * It is used only when a group-by clause is not given.
+   */
+  private LogicalNode insertGroupingIfUnresolved(LogicalPlan plan, String blockName,
+                                                 LogicalNode child, Stack<ExprType> stack) {
+    QueryBlock block = plan.getBlock(blockName);
+    if (!block.isGroupingResolved()) {
+      GroupbyNode groupbyNode = new GroupbyNode(new Column[] {});
+      groupbyNode.setTargets(block.getCurrentTargets());
+      groupbyNode.setSubNode(child);
+      groupbyNode.setInSchema(child.getOutSchema());
+
+      plan.postVisit(blockName, groupbyNode, stack);
+      block.checkAndSetEvaluatedTargets(groupbyNode);
+      return groupbyNode;
     } else {
-      return createCrossJoinFromJoinCondition(tables, cnf);
+      return child;
     }
   }
 
-  private static LogicalNode createCrossJoinFromJoinCondition(
-      FromTable [] tables, EvalNode [] cnf) {
-    Map<String, FromTable> fromTableMap = Maps.newHashMap();
-    for (FromTable f : tables) {
-      // TODO - to consider alias and self-join
-      fromTableMap.put(f.getTableName(), f);
+  private boolean isNoUpperProjection(Stack<ExprType> stack) {
+    for (ExprType node : stack) {
+      if (!( (node == ExprType.PROJECTION) || (node == ExprType.GROUP_BY) || (node == ExprType.JOIN) )) {
+        return false;
+      }
     }
 
-    JoinTree joinTree = new JoinTree(); // to infer join order
-    for (EvalNode expr : cnf) {
-      if (PlannerUtil.isJoinQual(expr)) {
-        joinTree.addJoin(expr);
+    return true;
+  }
+
+  /*===============================================================================================
+    Data Definition Language (DDL) SECTION
+   ===============================================================================================*/
+
+  private LogicalNode buildCreateTable(LogicalPlan plan, QueryBlock block, CreateTable expr, Stack<ExprType> stack)
+      throws VerifyException, CloneNotSupportedException {
+
+     String tableName = expr.getTableName();
+
+    if (expr.hasSubQuery()) {
+      stack.add(ExprType.CREATE_TABLE);
+      LogicalNode subQuery = createPlanInternal(plan, block, expr.getSubQuery(), stack);
+      stack.pop();
+      StoreTableNode storeNode = new StoreTableNode(tableName);
+      storeNode.setSubNode(subQuery);
+
+      if (expr.hasTableElements()) {
+        Schema schema = convertTableElementsSchema(expr.getTableElements());
+        storeNode.setOutSchema(schema);
+      } else {
+        storeNode.setOutSchema(subQuery.getOutSchema());
+      }
+      storeNode.setInSchema(subQuery.getOutSchema());
+
+      if (expr.hasStorageType()) {
+        storeNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+      } else {
+        // default type
+        // TODO - it should be configurable.
+        storeNode.setStorageType(CatalogProtos.StoreType.CSV);
+      }
+
+      if (expr.hasParams()) {
+        Options options = new Options();
+        options.putAll(expr.getParams());
+        storeNode.setOptions(options);
+      }
+
+      return storeNode;
+    } else {
+      CreateTableNode createTableNode = new CreateTableNode(expr.getTableName(),
+          convertTableElementsSchema(expr.getTableElements()));
+
+      if (expr.isExternal()) {
+        createTableNode.setExternal(true);
+      }
+
+      if (expr.hasStorageType()) {
+        createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+      } else {
+        // default type
+        // TODO - it should be configurable.
+        createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+      }
+      if (expr.hasParams()) {
+        Options options = new Options();
+        options.putAll(expr.getParams());
+        createTableNode.setOptions(options);
+      }
+
+      if (expr.hasLocation()) {
+        createTableNode.setPath(new Path(expr.getLocation()));
       }
+
+      return createTableNode;
     }
+  }
 
-    List<String> remain = Lists.newArrayList(fromTableMap.keySet());
-    remain.removeAll(joinTree.getTables()); // only remain joins not matched to any join condition
-    List<Edge> joinOrder = null;
-    LogicalNode subroot = null;
-    JoinNode join;
-    Schema joinSchema;
+  /**
+   * It transforms table definition elements to schema.
+   *
+   * @param elements to be transformed
+   * @return schema transformed from table definition elements
+   */
+  private Schema convertTableElementsSchema(CreateTable.ColumnDefinition [] elements) {
+    Schema schema = new Schema();
+
+    Column column;
+    for (CreateTable.ColumnDefinition columnDefinition: elements) {
+      column = new Column(columnDefinition.getColumnName(),
+          TajoDataTypes.Type.valueOf(columnDefinition.getDataType()));
+      schema.addColumn(column);
+    }
 
-    // if there are at least one join matched to the one of join conditions,
-    // we try to traverse the join tree in the depth-first manner and
-    // determine the initial join order. Here, we do not consider the join cost.
-    // The optimized join order will be considered in the optimizer.
-    if (joinTree.getJoinNum() > 0) {
-      Stack<String> stack = new Stack<String>();
-      Set<String> visited = Sets.newHashSet();
+    return schema;
+  }
 
+  private LogicalNode buildDropTable(DropTable dropTable) {
+    DropTableNode dropTableNode = new DropTableNode(dropTable.getTableName());
+    return dropTableNode;
+  }
 
-      // initially, one table is pushed into the stack
-      String seed = joinTree.getTables().iterator().next();
-      stack.add(seed);
+  /*===============================================================================================
+    Expression SECTION
+   ===============================================================================================*/
 
-      joinOrder = Lists.newArrayList();
+  public EvalNode createEvalTree(LogicalPlan plan, String blockName, final Expr expr)
+      throws VerifyException {
+    switch(expr.getType()) {
 
-      while (!stack.empty()) {
-        String table = stack.pop();
-        if (visited.contains(table)) {
-          continue;
+      // constants
+      case Literal:
+        LiteralValue literal = (LiteralValue) expr;
+        switch (literal.getValueType()) {
+          case String:
+            return new ConstEval(DatumFactory.createText(literal.getValue()));
+          case Unsigned_Integer:
+            return new ConstEval(DatumFactory.createInt4(literal.getValue()));
+          case Unsigned_Large_Integer:
+            return new ConstEval(DatumFactory.createInt8(literal.getValue()));
+          case Unsigned_Float:
+            return new ConstEval(DatumFactory.createFloat4(literal.getValue()));
+          default:
+            throw new RuntimeException("Unsupported type: " + literal.getValueType());
         }
-        visited.add(table);
 
-        // 'joinOrder' will contain all tables corresponding to the given join conditions.
-        for (Edge edge : joinTree.getEdges(table)) {
-          if (!visited.contains(edge.getTarget()) && !edge.getTarget().equals(table)) {
-            stack.add(edge.getTarget());
-            joinOrder.add(edge);
+        // unary expression
+      case Not:
+        NotExpr notExpr = (NotExpr) expr;
+        return new NotEval(createEvalTree(plan, blockName, notExpr.getChild()));
+
+      // binary expressions
+      case LikePredicate:
+        LikePredicate like = (LikePredicate) expr;
+        FieldEval field = (FieldEval) createEvalTree(plan, blockName, like.getColumnRef());
+        ConstEval pattern = (ConstEval) createEvalTree(plan, blockName, like.getPattern());
+        return new LikeEval(like.isNot(), field, pattern);
+
+      case Is:
+        break;
+
+      case And:
+      case Or:
+      case Equals:
+      case NotEquals:
+      case LessThan:
+      case LessThanOrEquals:
+      case GreaterThan:
+      case GreaterThanOrEquals:
+      case Plus:
+      case Minus:
+      case Multiply:
+      case Divide:
+      case Modular:
+        BinaryOperator bin = (BinaryOperator) expr;
+        return new BinaryEval(exprTypeToEvalType(expr.getType()),
+            createEvalTree(plan, blockName, bin.getLeft()),
+            createEvalTree(plan, blockName, bin.getRight()));
+
+      // others
+      case Column:
+        return createFieldEval(plan, blockName, (ColumnReferenceExpr) expr);
+
+      case CountRowsFunction:
+        FunctionDesc countRows = catalog.getFunction("count", new DataType[] {});
+
+        try {
+          plan.getBlock(blockName).setHasGrouping();
+
+          return new AggFuncCallEval(countRows, (AggFunction) countRows.newInstance(),
+              new EvalNode [] {});
+        } catch (InternalException e) {
+          throw new UndefinedFunctionException(CatalogUtil.
+              getCanonicalName(countRows.getSignature(), new DataType[] {}));
+        }
+
+      case CountValueFunction:
+      case Function:
+        FunctionExpr function = (FunctionExpr) expr;
+        // Given parameters
+        Expr [] params = function.getParams();
+        EvalNode [] givenArgs = new EvalNode[params.length];
+        DataType[] paramTypes = new DataType[params.length];
+
+        if (expr.getType() == OpType.CountValueFunction) {
+          givenArgs[0] = createEvalTree(plan, blockName, params[0]);
+          paramTypes[0] = CatalogUtil.newDataTypeWithoutLen(TajoDataTypes.Type.ANY);
+        } else {
+          for (int i = 0; i < params.length; i++) {
+            givenArgs[i] = createEvalTree(plan, blockName, params[i]);
+            paramTypes[i] = givenArgs[i].getValueType()[0];
           }
         }
-      }
 
-      subroot = new ScanNode(fromTableMap.get(joinOrder.get(0).getSrc()));
-      LogicalNode inner;
-      for (Edge edge : joinOrder) {
-        inner = new ScanNode(fromTableMap.get(edge.getTarget()));
-        join = new JoinNode(JoinType.CROSS_JOIN, subroot, inner);
-        subroot = join;
-
-        joinSchema = SchemaUtil.merge(
-            join.getOuterNode().getOutSchema(),
-            join.getInnerNode().getOutSchema());
-        join.setInSchema(joinSchema);
-        join.setOutSchema(joinSchema);
-      }
+        if (!catalog.containFunction(function.getSignature(), paramTypes)) {
+            throw new UndefinedFunctionException(CatalogUtil.
+                getCanonicalName(function.getSignature(), paramTypes));
+        }
+
+        FunctionDesc funcDesc = catalog.getFunction(function.getSignature(), paramTypes);
+
+        try {
+          if (funcDesc.getFuncType() == CatalogProtos.FunctionType.GENERAL)
+
+            return new FuncCallEval(funcDesc,
+                (GeneralFunction) funcDesc.newInstance(), givenArgs);
+          else {
+            plan.getBlock(blockName).setHasGrouping();
+            return new AggFuncCallEval(funcDesc,
+                (AggFunction) funcDesc.newInstance(), givenArgs);
+          }
+        } catch (InternalException e) {
+          e.printStackTrace();
+        }
+
+      case CaseWhen:
+        CaseWhenPredicate caseWhenExpr = (CaseWhenPredicate) expr;
+        return createCaseWhenEval(plan, blockName, caseWhenExpr);
+
+      case IsNullPredicate:
+        IsNullPredicate nullPredicate = (IsNullPredicate) expr;
+        return new IsNullEval(nullPredicate.isNot(),
+            createFieldEval(plan, blockName, nullPredicate.getColumnRef()));
+
+      default:
     }
+    return null;
+  }
 
-    // Here, there are two cases:
-    // 1) there already exists the join plan.
-    // 2) there are no join plan.
-    if (joinOrder != null) { // case 1)
-      // if there are join tables corresponding to any join condition,
-      // the join plan is placed as the outer plan of the product.
-      remain.remove(joinOrder.get(0).getSrc());
-      remain.remove(joinOrder.get(0).getTarget());
-    } else { // case 2)
-      // if there are no inferred joins, the one of the remain join tables is placed as the left table
-      subroot = new ScanNode(fromTableMap.get(remain.get(0)));
-      remain.remove(remain.get(0));
-    }
-
-    // Here, the variable 'remain' contains join tables which are not matched to any join conditions.
-    // Thus, they will be joined by catasian product
-    for (String table : remain) {
-      join = new JoinNode(JoinType.CROSS_JOIN,
-          subroot, new ScanNode(fromTableMap.get(table)));
-      joinSchema = SchemaUtil.merge(
-          join.getOuterNode().getOutSchema(),
-          join.getInnerNode().getOutSchema());
-      join.setInSchema(joinSchema);
-      join.setOutSchema(joinSchema);
-      subroot = join;
-    }
-
-    return subroot;
-  }
-
-  // TODO - this method is somewhat duplicated to createCrossJoinFromJoinCondition. Later, it should be removed.
-  private static LogicalNode createCatasianProduct(FromTable [] tables) {
-    LogicalNode subroot = new ScanNode(tables[0]);
-    Schema joinSchema;
-    if(tables.length > 1) {
-      for(int i=1; i < tables.length; i++) {
-        JoinNode join = new JoinNode(JoinType.CROSS_JOIN,
-            subroot, new ScanNode(tables[i]));
-        joinSchema = SchemaUtil.merge(
-            join.getOuterNode().getOutSchema(),
-            join.getInnerNode().getOutSchema());
-        join.setInSchema(joinSchema);
-        join.setOutSchema(joinSchema);
-        subroot = join;
-      }
+  private FieldEval createFieldEval(LogicalPlan plan, String blockName,
+                                    ColumnReferenceExpr columnRef) throws VerifyException {
+    Column column;
+    if (columnRef.hasTableName()) {
+      column = plan.findColumnFromRelation(blockName, columnRef.getTableName(), columnRef.getName());
+    } else {
+      column = plan.suspectColumn(blockName, columnRef.getName());
     }
+    return new FieldEval(column);
+  }
 
-    return subroot;
+  private static Type exprTypeToEvalType(OpType type) {
+    switch (type) {
+      case And: return Type.AND;
+      case Or: return Type.OR;
+      case Equals: return Type.EQUAL;
+      case NotEquals: return Type.NOT_EQUAL;
+      case LessThan: return Type.LTH;
+      case LessThanOrEquals: return Type.LEQ;
+      case GreaterThan: return Type.GTH;
+      case GreaterThanOrEquals: return Type.GEQ;
+      case Plus: return Type.PLUS;
+      case Minus: return Type.MINUS;
+      case Multiply: return Type.MULTIPLY;
+      case Divide: return Type.DIVIDE;
+      case Modular: return Type.MODULAR;
+      case Column: return Type.FIELD;
+      case Function: return Type.FUNCTION;
+      default: throw new RuntimeException("Unsupported type: " + type);
+    }
   }
 
-  public static Schema getProjectedSchema(PlanningContext context, Target [] targets) {
+  public CaseWhenEval createCaseWhenEval(LogicalPlan plan, String blockName,
+                                              CaseWhenPredicate caseWhen) throws VerifyException {
+    CaseWhenEval caseEval = new CaseWhenEval();
+    EvalNode condition;
+    EvalNode result;
+
+    for (CaseWhenPredicate.WhenExpr when : caseWhen.getWhens()) {
+      condition = createEvalTree(plan, blockName, when.getCondition());
+      result = createEvalTree(plan, blockName, when.getResult());
+      caseEval.addWhen(condition, result);
+    }
+
+    if (caseWhen.hasElseResult()) {
+      caseEval.setElseResult(createEvalTree(plan, blockName, caseWhen.getElseResult()));
+    }
+
+    return caseEval;
+  }
+
+  Target [] annotateTargets(LogicalPlan plan, String blockName,
+                                       org.apache.tajo.algebra.Target [] targets)
+      throws VerifyException {
+    Target annotatedTargets [] = new Target[targets.length];
+
+    for (int i = 0; i < targets.length; i++) {
+      annotatedTargets[i] = createTarget(plan, blockName, targets[i]);
+    }
+    return annotatedTargets;
+  }
+
+  Target createTarget(LogicalPlan plan, String blockId,
+                             org.apache.tajo.algebra.Target target) throws VerifyException {
+    if (target.hasAlias()) {
+      return new Target(createEvalTree(plan, blockId, target.getExpr()),
+          target.getAlias());
+    } else {
+      return new Target(createEvalTree(plan, blockId, target.getExpr()));
+    }
+  }
+
+  /**
+   * It transforms a list of targets to schema. If it contains anonymous targets, it names them.
+   */
+  static Schema getProjectedSchema(LogicalPlan plan, Target [] targets) {
     Schema projected = new Schema();
     for(Target t : targets) {
       DataType type = t.getEvalTree().getValueType()[0];
@@ -652,7 +1022,7 @@ public class LogicalPlanner {
       if (t.hasAlias()) {
         name = t.getAlias();
       } else if (t.getEvalTree().getName().equals("?")) {
-        name = context.getGeneratedColumnName();
+        name = plan.newAnonymousColumnName();
       } else {
         name = t.getEvalTree().getName();
       }
@@ -661,27 +1031,4 @@ public class LogicalPlanner {
 
     return projected;
   }
-  
-  private static Schema getNaturalJoin(LogicalNode outer, LogicalNode inner) {
-    Schema joinSchema = new Schema();
-    Schema commons = SchemaUtil.getCommons(outer.getOutSchema(),
-        inner.getOutSchema());
-    joinSchema.addColumns(commons);
-    for (Column c : outer.getOutSchema().getColumns()) {
-      for (Column common : commons.getColumns()) {
-        if (!common.getColumnName().equals(c.getColumnName())) {
-          joinSchema.addColumn(c);
-        }
-      }
-    }
-
-    for (Column c : inner.getOutSchema().getColumns()) {
-      for (Column common : commons.getColumns()) {
-        if (!common.getColumnName().equals(c.getColumnName())) {
-          joinSchema.addColumn(c);
-        }
-      }
-    }
-    return joinSchema;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
new file mode 100644
index 0000000..56ae230
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/NoSuchColumnException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+public class NoSuchColumnException extends VerifyException {
+  public NoSuchColumnException(String columnName) {
+    super("ERROR: no such column '" + columnName + "'");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/00c3ee2b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 0796cfd..9671066 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -125,11 +125,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         return new LimitExec(ctx, limitNode.getInSchema(),
             limitNode.getOutSchema(), outer, limitNode);
 
-      case CREATE_INDEX:
-        IndexWriteNode createIndexNode = (IndexWriteNode) logicalNode;
-        outer = createPlanRecursive(ctx, createIndexNode.getSubNode());
-        return createIndexWritePlan(sm, ctx, createIndexNode, outer);
-
       case BST_INDEX_SCAN:
         IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
         outer = createIndexScanExec(ctx, indexScanNode);
@@ -290,14 +285,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new ExternalSortExec(ctx, sm, sortNode, subOp);
   }
 
-  public PhysicalExec createIndexWritePlan(StorageManager sm,
-                                           TaskAttemptContext ctx, IndexWriteNode indexWriteNode, PhysicalExec subOp)
-      throws IOException {
-
-    return new IndexWriteExec(ctx, sm, indexWriteNode, ctx.getTable(indexWriteNode
-        .getTableName()), subOp);
-  }
-
   public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
                                           IndexScanNode annotation)
       throws IOException {