You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/22 09:49:48 UTC

git commit: TAJO-543: InsertNode and CreateTableNode should play their roles.

Updated Branches:
  refs/heads/master 358dbace5 -> d06dd852e


TAJO-543: InsertNode and CreateTableNode should play their roles.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d06dd852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d06dd852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d06dd852

Branch: refs/heads/master
Commit: d06dd852e5f299a9cb329f870e4795a7bfda1917
Parents: 358dbac
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 22 13:44:41 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 22 17:40:37 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/engine/planner/BaseAlgebraVisitor.java |   6 +-
 .../engine/planner/BasicLogicalPlanVisitor.java |  12 +-
 .../apache/tajo/engine/planner/InsertNode.java  | 115 +++++-----
 .../engine/planner/LogicalPlanPreprocessor.java |   3 +-
 .../engine/planner/LogicalPlanVerifier.java     |  12 +-
 .../tajo/engine/planner/LogicalPlanner.java     | 223 ++++++++++++++-----
 .../engine/planner/PhysicalPlannerImpl.java     |  18 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |   3 +-
 .../engine/planner/PreLogicalPlanVerifier.java  |  49 ++++
 .../engine/planner/global/GlobalPlanner.java    |  36 ++-
 .../engine/planner/logical/CreateTableNode.java |  79 +------
 .../logical/PartitionedTableScanNode.java       |   1 -
 .../planner/logical/PersistentStoreNode.java    |  24 +-
 .../planner/logical/ShuffleFileWriteNode.java   |   4 +-
 .../engine/planner/logical/StoreIndexNode.java  |   4 +-
 .../engine/planner/logical/StoreTableNode.java  |  46 +---
 .../tajo/engine/planner/logical/UnaryNode.java  |   3 +-
 .../ColumnPartitionedTableStoreExec.java        |  32 +--
 .../engine/planner/physical/StoreTableExec.java |  11 +-
 .../planner/rewrite/ProjectionPushDownRule.java |  34 ++-
 .../org/apache/tajo/master/GlobalEngine.java    | 146 +++---------
 .../apache/tajo/master/querymaster/Query.java   |  32 ++-
 .../tajo/engine/planner/TestLogicalPlanner.java |  22 +-
 .../tajo/engine/query/TestInsertQuery.java      |   4 +-
 25 files changed, 486 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e4feb6a..020787c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -123,6 +123,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-543: InsertNode and CreateTableNode should play their roles. (hyunsik)
+
     TAJO-409: Add explored and explained annotations to Tajo function system.
     (SeongHwa Ahn via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index ddc75ff..6cc6fd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -269,8 +269,10 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
   @Override
   public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
     stack.push(expr);
-    for (NamedExpr target : expr.getNamedExprs()) {
-      visit(ctx, stack, target);
+    if (!expr.isAllProjected()) {
+      for (NamedExpr target : expr.getNamedExprs()) {
+        visit(ctx, stack, target);
+      }
     }
     RESULT result = visit(ctx, stack, expr.getChild());
     stack.pop();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 59c6872..b629013 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -256,15 +256,21 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   public RESULT visitInsert(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
                             Stack<LogicalNode> stack) throws PlanningException {
     stack.push(node);
-    RESULT result = visit(context, plan, plan.getBlock(node.getSubQuery()), node.getSubQuery(), stack);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
     stack.pop();
     return result;
   }
 
   @Override
   public RESULT visitCreateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node,
-                                 Stack<LogicalNode> stack) {
-    return null;
+                                 Stack<LogicalNode> stack) throws PlanningException {
+    RESULT result = null;
+    stack.push(node);
+    if (node.hasSubQuery()) {
+      result = visit(context, plan, block, node.getChild(), stack);
+    }
+    stack.pop();
+    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ecba05a..3dd371b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -20,30 +20,38 @@ package org.apache.tajo.engine.planner;
 
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.LogicalNodeVisitor;
 import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.util.TUtil;
 
-public class InsertNode extends LogicalNode implements Cloneable {
+public class InsertNode extends StoreTableNode implements Cloneable {
   @Expose private boolean overwrite;
-  @Expose private TableDesc targetTableDesc;
+  @Expose private Schema tableSchema;
+
+  /** a target schema of a target table */
   @Expose private Schema targetSchema;
+  /** a output schema of select clause */
+  @Expose private Schema projectedSchema;
   @Expose private Path path;
-  @Expose private StoreType storageType;
-  @Expose private Options options;
-  @Expose private LogicalNode subQuery;
 
   public InsertNode(int pid) {
     super(pid, NodeType.INSERT);
   }
 
-  public void setTargetTableDesc(TableDesc desc) {
-    this.targetTableDesc = desc;
+  public void setTargetTable(TableDesc desc) {
+    setTableName(desc.getName());
+    tableSchema = desc.getSchema();
+    setPath(desc.getPath());
+    setOptions(desc.getMeta().getOptions());
+    setStorageType(desc.getMeta().getStoreType());
+
+    if (desc.hasPartitions()) {
+      this.setPartitions(desc.getPartitions());
+    }
   }
 
   public void setTargetLocation(Path path) {
@@ -51,7 +59,7 @@ public class InsertNode extends LogicalNode implements Cloneable {
   }
 
   public void setSubQuery(LogicalNode subQuery) {
-    this.subQuery = subQuery;
+    this.setChild(subQuery);
     this.setInSchema(subQuery.getOutSchema());
     this.setOutSchema(subQuery.getOutSchema());
   }
@@ -64,12 +72,12 @@ public class InsertNode extends LogicalNode implements Cloneable {
     this.overwrite = overwrite;
   }
 
-  public boolean hasTargetTable() {
-    return this.targetTableDesc != null;
+  public Schema getTableSchema() {
+    return tableSchema;
   }
 
-  public TableDesc getTargetTable() {
-    return this.targetTableDesc;
+  public void setTableSchema(Schema tableSchema) {
+    this.tableSchema = tableSchema;
   }
 
   public boolean hasTargetSchema() {
@@ -84,6 +92,14 @@ public class InsertNode extends LogicalNode implements Cloneable {
     this.targetSchema = schema;
   }
 
+  public Schema getProjectedSchema() {
+    return this.projectedSchema;
+  }
+
+  public void setProjectedSchema(Schema projected) {
+    this.projectedSchema = projected;
+  }
+
   public boolean hasPath() {
     return this.path != null;
   }
@@ -99,42 +115,16 @@ public class InsertNode extends LogicalNode implements Cloneable {
   public boolean hasStorageType() {
     return this.storageType != null;
   }
-
-  public void setStorageType(StoreType storageType) {
-    this.storageType = storageType;
-  }
-
-  public StoreType getStorageType() {
-    return this.storageType;
-  }
-  
-  public boolean hasOptions() {
-    return this.options != null;
-  }
-  
-  public void setOptions(Options opt) {
-    this.options = opt;
-  }
-  
-  public Options getOptions() {
-    return this.options;
-  }
-
-  public LogicalNode getSubQuery() {
-    return this.subQuery;
-  }
   
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof InsertNode) {
       InsertNode other = (InsertNode) obj;
       return super.equals(other)
-          && TUtil.checkEquals(this.targetTableDesc, other.targetTableDesc)
-          && TUtil.checkEquals(path, other.path)
           && this.overwrite == other.overwrite
-          && TUtil.checkEquals(this.storageType, other.storageType)
-          && TUtil.checkEquals(options, other.options)
-          && subQuery.equals(other.subQuery);
+          && TUtil.checkEquals(this.tableSchema, other.tableSchema)
+          && TUtil.checkEquals(this.targetSchema, other.targetSchema)
+          && TUtil.checkEquals(path, other.path);
     } else {
       return false;
     }
@@ -142,14 +132,12 @@ public class InsertNode extends LogicalNode implements Cloneable {
   
   @Override
   public Object clone() throws CloneNotSupportedException {
-    InsertNode store = (InsertNode) super.clone();
-    store.targetTableDesc = targetTableDesc != null ? targetTableDesc : null;
-    store.path = path != null ? new Path(path.toString()) : null;
-    store.overwrite = overwrite;
-    store.storageType = storageType != null ? storageType : null;
-    store.options = (Options) (options != null ? options.clone() : null);
-    store.subQuery = (LogicalNode) subQuery.clone();
-    return store;
+    InsertNode insertNode = (InsertNode) super.clone();
+    insertNode.overwrite = overwrite;
+    insertNode.tableSchema = new Schema(tableSchema);
+    insertNode.targetSchema = targetSchema != null ? new Schema(targetSchema) : null;
+    insertNode.path = path != null ? new Path(path.toString()) : null;
+    return insertNode;
   }
   
   public String toString() {
@@ -158,38 +146,47 @@ public class InsertNode extends LogicalNode implements Cloneable {
     if (overwrite) {
       sb.append("OVERWRITE ");
     }
-    sb.append("INTO");
+    sb.append("INTO ");
 
     if (hasTargetTable()) {
-      sb.append(targetTableDesc.getName());
+      sb.append("  ").append(tableName);
+    }
+
+    if (hasTargetSchema()) {
+      sb.append("\n  ").append(targetSchema);
     }
 
     if (hasPath()) {
-      sb.append(" LOCATION ");
+      sb.append("\n  LOCATION ");
       sb.append(path);
     }
 
-    sb.append(" [SUBQUERY]");
+    sb.append("\n").append(getChild());
     
     return sb.toString();
   }
 
   @Override
   public void preOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);    
+    getChild().preOrder(visitor);
+    visitor.visit(this);
   }
 
   @Override
   public void postOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);    
+    visitor.visit(this);
+    getChild().postOrder(visitor);
   }
 
   @Override
   public PlanString getPlanString() {
     PlanString planString = new PlanString("INSERT");
-    planString.addExplan(" INTO ");
+    planString.appendTitle(" INTO ");
     if (hasTargetTable()) {
-      planString.addExplan(getTargetTable().getName());
+      planString.appendTitle(getTableName());
+      if (hasTargetSchema()) {
+        planString.addExplan(getTargetSchema().toString());
+      }
     } else {
       planString.addExplan("LOCATION " + path);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
index 3ab328c..586ef68 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -290,8 +290,7 @@ class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor
   ///////////////////////////////////////////////////////////////////////////////////////////////////////////
 
   public LogicalNode visitInsert(PreprocessContext ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
-    PreprocessContext newContext = new PreprocessContext(ctx, ctx.plan.newQueryBlock());
-    LogicalNode child = super.visitInsert(newContext, stack, expr);
+    LogicalNode child = super.visitInsert(ctx, stack, expr);
 
     InsertNode insertNode = new InsertNode(ctx.plan.newPID());
     insertNode.setInSchema(child.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index 41e42d0..ac76f57 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -164,21 +164,17 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
                                      StoreTableNode node, Stack<LogicalNode> stack) throws PlanningException {
     visit(state, plan, block, node.getChild(), stack);
 
-    if (node.isCreatedTable() && catalog.existsTable(node.getTableName())) {
-      state.addVerification("relation \"" + node.getTableName() + "\" already exists");
-    }
-
     return node;
   }
 
   @Override
   public LogicalNode visitInsert(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode child = visit(state, plan, plan.getBlock(node.getSubQuery()), node.getSubQuery(), stack);
+    LogicalNode child = visit(state, plan, block, node.getChild(), stack);
 
-    if (node.hasTargetSchema()) {
-      ensureDomains(state, node.getTargetSchema(), node.getSubQuery().getOutSchema());
-    }
+//    if (node.hasTargetSchema()) {
+//      ensureDomains(state, node.getTargetSchema(), node.getChild().getOutSchema());
+//    }
 
     return child;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 49d5ffb..8995226 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.exception.VerifyException;
@@ -1041,60 +1042,153 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
   public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws PlanningException {
     stack.push(expr);
-    QueryBlock newQueryBlock = context.plan.getBlockByExpr(expr.getSubQuery());
-    PlanContext newContext = new PlanContext(context, newQueryBlock);
-    Stack<Expr> subStack = new Stack<Expr>();
-    LogicalNode subQuery = visit(newContext, subStack, expr.getSubQuery());
-    context.plan.connectBlocks(newQueryBlock, context.queryBlock, BlockType.TableSubQuery);
+    LogicalNode subQuery = super.visitInsert(context, stack, expr);
     stack.pop();
 
-    InsertNode insertNode = null;
-    if (expr.hasTableName()) {
-      TableDesc desc = catalog.getTableDesc(expr.getTableName());
+    InsertNode insertNode = context.queryBlock.getNodeFromExpr(expr);
+    insertNode.setOverwrite(expr.isOverwrite());
+    insertNode.setSubQuery(subQuery);
+
+    if (expr.hasTableName()) { // INSERT (OVERWRITE) INTO TABLE ...
+      return buildInsertIntoTablePlan(context, insertNode, expr);
+    } else if (expr.hasLocation()) { // INSERT (OVERWRITE) INTO LOCATION ...
+      return buildInsertIntoLocationPlan(context, insertNode, expr);
+    } else {
+      throw new IllegalStateException("Invalid Query");
+    }
+  }
+
+  /**
+   * Builds a InsertNode with a target table.
+   *
+   * ex) INSERT OVERWRITE INTO TABLE ...
+   * <br />
+   *
+   * We use the following terms, such target table, target column
+   * <pre>
+   * INSERT INTO    TB_NAME        (col1, col2)          SELECT    c1,   c2        FROM ...
+   *                ^^^^^^^        ^^^^^^^^^^^^                  ^^^^^^^^^^^^
+   *             target table   target columns (or schema)     projected columns (or schema)
+   * </pre>
+   */
+  private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
+      throws PlanningException {
+    // Get and set a target table
+    TableDesc desc = catalog.getTableDesc(expr.getTableName());
+    insertNode.setTargetTable(desc);
+
+    //
+    // When we use 'INSERT (OVERWIRTE) INTO TABLE statements, there are two cases.
+    //
+    // First, when a user specified target columns
+    // INSERT (OVERWRITE)? INTO table_name (col1 type, col2 type) SELECT ...
+    //
+    // Second, when a user do not specified target columns
+    // INSERT (OVERWRITE)? INTO table_name SELECT ...
+    //
+    // In the former case is, target columns' schema and corresponding projected columns' schema
+    // must be equivalent or be available to cast implicitly.
+    //
+    // In the later case, the target table's schema and projected column's
+    // schema of select clause can be different to each other. In this case,
+    // we use only a sequence of preceding columns of target table's schema
+    // as target columns.
+    //
+    // For example, consider a target table and an 'insert into' query are give as follows:
+    //
+    // CREATE TABLE TB1                  (col1 int,  col2 int, col3 long);
+    //                                      ||          ||
+    // INSERT OVERWRITE INTO TB1 SELECT  order_key,  part_num               FROM ...
+    //
+    // In this example, only col1 and col2 are used as target columns.
+
+    if (expr.hasTargetColumns()) { // when a user specified target columns
+
+      if (expr.getTargetColumns().length > insertNode.getChild().getOutSchema().getColumnNum()) {
+        throw new PlanningException("Target columns and projected columns are mismatched to each other");
+      }
+
+      // See PreLogicalPlanVerifier.visitInsert.
+      // It guarantees that the equivalence between the numbers of target and projected columns.
+
       context.queryBlock.addRelation(new ScanNode(context.plan.newPID(), desc));
+      String [] targets = expr.getTargetColumns();
+      Schema targetColumns = new Schema();
+      for (int i = 0; i < targets.length; i++) {
+        Column targetColumn = context.plan.resolveColumn(context.queryBlock, new ColumnReferenceExpr(targets[i]));
+        targetColumns.addColumn(targetColumn);
+      }
+      insertNode.setTargetSchema(targetColumns);
+      insertNode.setOutSchema(targetColumns);
+      buildProjectedInsert(insertNode);
 
-      Schema targetSchema = new Schema();
-      if (expr.hasTargetColumns()) {
-        // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
-        String[] targetColumnNames = expr.getTargetColumns();
-        for (int i = 0; i < targetColumnNames.length; i++) {
-          Column targetColumn = context.plan.resolveColumn(context.queryBlock,
-              new ColumnReferenceExpr(targetColumnNames[i]));
-          targetSchema.addColumn(targetColumn);
-        }
-      } else {
-        // use the output schema of select clause as target schema
-        // if didn't specific target columns like the way below,
-        // INSERT OVERWRITE INTO TABLE tbl SELECT ...
-        Schema targetTableSchema = desc.getSchema();
-        for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
-          targetSchema.addColumn(targetTableSchema.getColumn(i));
-        }
+    } else { // when a user do not specified target columns
+
+      // The output schema of select clause determines the target columns.
+      Schema tableSchema = desc.getSchema();
+      Schema projectedSchema = insertNode.getChild().getOutSchema();
+
+      Schema targetColumns = new Schema();
+      for (int i = 0; i < projectedSchema.getColumnNum(); i++) {
+        targetColumns.addColumn(tableSchema.getColumn(i));
       }
+      insertNode.setTargetSchema(targetColumns);
+      buildProjectedInsert(insertNode);
+    }
 
-      insertNode = context.queryBlock.getNodeFromExpr(expr);
-      insertNode.setTargetTableDesc(desc);
-      insertNode.setSubQuery(subQuery);
-      insertNode.setTargetSchema(targetSchema);
-      insertNode.setOutSchema(targetSchema);
+    if (desc.hasPartitions()) {
+      insertNode.setPartitions(desc.getPartitions());
     }
+    return insertNode;
+  }
 
-    if (expr.hasLocation()) {
-      insertNode = context.queryBlock.getNodeFromExpr(expr);
-      insertNode.setTargetLocation(new Path(expr.getLocation()));
-      insertNode.setSubQuery(subQuery);
-      if (expr.hasStorageType()) {
-        insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
-      }
-      if (expr.hasParams()) {
-        Options options = new Options();
-        options.putAll(expr.getParams());
-        insertNode.setOptions(options);
+  private void buildProjectedInsert(InsertNode insertNode) {
+    Schema tableSchema = insertNode.getTableSchema();
+    Schema targetColumns = insertNode.getTargetSchema();
+
+    ProjectionNode projectionNode = insertNode.getChild();
+
+    // Modifying projected columns by adding NULL constants
+    // It is because that table appender does not support target columns to be written.
+    List<Target> targets = TUtil.newList();
+    for (int i = 0, j = 0; i < tableSchema.getColumnNum(); i++) {
+      Column column = tableSchema.getColumn(i);
+
+      if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
+        targets.add(projectionNode.getTargets()[j++]);
+      } else {
+        targets.add(new Target(new ConstEval(NullDatum.get()), column.getColumnName()));
       }
     }
+    projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
 
-    insertNode.setOverwrite(expr.isOverwrite());
+    insertNode.setInSchema(projectionNode.getOutSchema());
+    insertNode.setOutSchema(projectionNode.getOutSchema());
+    insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+  }
+
+  /**
+   * Build a InsertNode with a location.
+   *
+   * ex) INSERT OVERWRITE INTO LOCATION 'hdfs://....' ..
+   */
+  private InsertNode buildInsertIntoLocationPlan(PlanContext context, InsertNode insertNode, Insert expr) {
+    // INSERT (OVERWRITE)? INTO LOCATION path (USING file_type (param_clause)?)? query_expression
 
+    Schema childSchema = insertNode.getChild().getOutSchema();
+    insertNode.setInSchema(childSchema);
+    insertNode.setOutSchema(childSchema);
+    insertNode.setTableSchema(childSchema);
+    insertNode.setTargetLocation(new Path(expr.getLocation()));
+
+    if (expr.hasStorageType()) {
+      insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+    }
+    if (expr.hasParams()) {
+      Options options = new Options();
+      options.putAll(expr.getParams());
+      insertNode.setOptions(options);
+    }
     return insertNode;
   }
 
@@ -1106,43 +1200,50 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
       throws PlanningException {
 
-    String tableName = expr.getTableName();
+    // Get a table name to be created.
+    String tableNameTobeCreated = expr.getTableName();
 
-    if (expr.hasSubQuery()) {
+    if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
       stack.add(expr);
       LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
       stack.pop();
-      StoreTableNode storeNode = new StoreTableNode(context.plan.newPID(), tableName);
-      storeNode.setCreateTable();
-      storeNode.setChild(subQuery);
+      CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+      createTableNode.setTableName(tableNameTobeCreated);
+      createTableNode.setChild(subQuery);
+      createTableNode.setInSchema(subQuery.getOutSchema());
 
-      storeNode.setInSchema(subQuery.getOutSchema());
+      // if no table definition, the select clause's output schema will be used.
+      // ex) CREATE TABLE tbl AS SELECT ...
       if(!expr.hasTableElements()) {
-        // CREATE TABLE tbl AS SELECT ...
+
         expr.setTableElements(convertSchemaToTableElements(subQuery.getOutSchema()));
       }
-      // else CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
-      storeNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
 
-      if (expr.hasStorageType()) {
-        storeNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
-      } else {
+      // Otherwise, it uses the defined table elements.
+      // ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
+      createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
+      createTableNode.setSchema(convertTableElementsSchema(expr.getTableElements()));
+
+      if (expr.hasStorageType()) { // If storage type (using clause) is specified
+        createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+      } else { // If no specified storage type
         // default type
-        storeNode.setStorageType(CatalogProtos.StoreType.CSV);
+        createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
       }
 
-      if (expr.hasParams()) {
+      if (expr.hasParams()) { // if 'with clause' is specified
         Options options = new Options();
         options.putAll(expr.getParams());
-        storeNode.setOptions(options);
+        createTableNode.setOptions(options);
       }
 
-      if (expr.hasPartition()) {
-        storeNode.setPartitions(convertTableElementsPartition(context, expr));
+      if (expr.hasPartition()) { // if 'partition by' is specified
+        createTableNode.setPartitions(convertTableElementsPartition(context, expr));
       }
 
-      return storeNode;
-    } else {
+      return createTableNode;
+
+    } else { // if CREATE AN EMPTY TABLE
       Schema tableSchema;
       boolean mergedPartition = false;
       if (expr.hasPartition()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index d9689aa..7de053b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -26,32 +26,32 @@ import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Projection;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.List;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
 
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
@@ -90,8 +90,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
                                            PhysicalExec execPlan) throws IOException {
     DataChannel channel = context.getDataChannel();
-    ShuffleFileWriteNode shuffleFileWriteNode =
-        new ShuffleFileWriteNode(UNGENERATED_PID, channel.getTargetId().toString());
+    ShuffleFileWriteNode shuffleFileWriteNode = new ShuffleFileWriteNode(UNGENERATED_PID);
+    shuffleFileWriteNode.setTableName(channel.getTargetId().toString());
     shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
     shuffleFileWriteNode.setInSchema(plan.getOutSchema());
     shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
@@ -116,6 +116,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         EvalExprNode evalExpr = (EvalExprNode) logicalNode;
         return new EvalExprExec(ctx, evalExpr);
 
+      case CREATE_TABLE:
+      case INSERT:
       case STORE:
         StoreTableNode storeNode = (StoreTableNode) logicalNode;
         leftExec = createPlanRecursive(ctx, storeNode.getChild());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 66358d5..ae5f4fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -48,7 +48,8 @@ public class PlannerUtil {
       baseNode = ((LogicalRootNode) node).getChild();
     }
 
-    return baseNode.getType() == NodeType.CREATE_TABLE || baseNode.getType() == NodeType.DROP_TABLE;
+    return (baseNode.getType() == NodeType.CREATE_TABLE && !((CreateTableNode)baseNode).hasSubQuery()) ||
+        baseNode.getType() == NodeType.DROP_TABLE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
new file mode 100644
index 0000000..77e2a0b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.Insert;
+import org.apache.tajo.algebra.OpType;
+import org.apache.tajo.algebra.Projection;
+
+import java.util.Stack;
+
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationState, Expr> {
+
+  public Expr visitInsert(VerificationState ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(ctx, stack, expr);
+
+    if (child != null && child.getType() == OpType.Projection) {
+      if (expr.hasTargetColumns()) {
+        Projection projection = (Projection) child;
+        int projectColumnNum = projection.getNamedExprs().length;
+        int targetColumnNum = expr.getTargetColumns().length;
+
+        if (targetColumnNum > projectColumnNum)  {
+          ctx.addVerification("INSERT has more target columns than expressions");
+        } else if (targetColumnNum < projectColumnNum) {
+          ctx.addVerification("INSERT has more expressions than target columns");
+        }
+      }
+    }
+
+    return child;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 58ad73c..341a58a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -389,12 +389,27 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
     DataChannel channel;
     CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+
     if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
       channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
       Column[] columns = new Column[partitionDesc.getColumns().size()];
-      channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+
+      if (currentNode.getType() == NodeType.INSERT) {
+        InsertNode insertNode = (InsertNode) currentNode;
+        channel.setSchema(((InsertNode)currentNode).getProjectedSchema());
+        Column [] shuffleKeys = new Column[partitionDesc.getColumns().size()];
+        int i = 0;
+        for (Column column : partitionDesc.getColumns()) {
+          int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+          shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
+        }
+        channel.setShuffleKeys(shuffleKeys);
+      } else {
+        channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+      }
       channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
+
     } else {
       throw new PlanningException(String.format("Not Supported PartitionsType :%s", partitionsType));
     }
@@ -679,13 +694,26 @@ public class GlobalPlanner {
     }
 
     @Override
+    public LogicalNode visitCreateTable(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
+                                       CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+      LogicalNode child = super.visitStoreTable(context, plan, queryBlock, node, stack);
+
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
+
+      return node;
+    }
+
+    @Override
     public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
                                    InsertNode node, Stack<LogicalNode> stack)
         throws PlanningException {
       LogicalNode child = super.visitInsert(context, plan, queryBlock, node, stack);
-      ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-      execBlock.setPlan(node);
-      context.execBlockMap.put(node.getPID(), execBlock);
+
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
 
       return node;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 3dc5655..66a2355 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -20,36 +20,20 @@ package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
-public class CreateTableNode extends LogicalNode implements Cloneable {
-  @Expose private String tableName;
-  @Expose private Column[] partitionKeys;
-  @Expose private StoreType storageType;
+public class CreateTableNode extends StoreTableNode implements Cloneable {
   @Expose private Schema schema;
   @Expose private Path path;
-  @Expose private Options options;
   @Expose private boolean external;
-  @Expose private PartitionDesc partitionDesc;
 
   public CreateTableNode(int pid) {
     super(pid, NodeType.CREATE_TABLE);
   }
 
-  public void setTableName(String name) {
-    this.tableName = name;
-  }
-
-  public final String getTableName() {
-    return this.tableName;
-  }
-
   public void setSchema(Schema schema) {
     this.schema = schema;
   }
@@ -58,14 +42,6 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     return this.schema;
   }
 
-  public void setStorageType(StoreType storageType) {
-    this.storageType = storageType;
-  }
-
-  public StoreType getStorageType() {
-    return this.storageType;
-  }
-
   public boolean hasPath() {
     return this.path != null;
   }
@@ -77,18 +53,6 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   public Path getPath() {
     return this.path;
   }
-  
-  public boolean hasOptions() {
-    return this.options != null;
-  }
-  
-  public void setOptions(Options opt) {
-    this.options = opt;
-  }
-  
-  public Options getOptions() {
-    return this.options;
-  }
 
   public boolean isExternal() {
     return external;
@@ -98,16 +62,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     this.external = external;
   }
 
-  public PartitionDesc getPartitions() {
-    return partitionDesc;
-  }
-
-  public void setPartitions(PartitionDesc partitionDesc) {
-    this.partitionDesc = partitionDesc;
-  }
-
-  public boolean hasPartition() {
-    return this.partitionDesc != null;
+  public boolean hasSubQuery() {
+    return child != null;
   }
 
   @Override
@@ -120,14 +76,9 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     if (obj instanceof CreateTableNode) {
       CreateTableNode other = (CreateTableNode) obj;
       return super.equals(other)
-          && this.tableName.equals(other.tableName)
           && this.schema.equals(other.schema)
-          && this.storageType == other.storageType
           && this.external == other.external
-          && TUtil.checkEquals(path, other.path)
-          && TUtil.checkEquals(options, other.options)
-          && TUtil.checkEquals(partitionKeys, other.partitionKeys)
-          && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+          && TUtil.checkEquals(path, other.path);
     } else {
       return false;
     }
@@ -141,29 +92,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     store.storageType = storageType;
     store.external = external;
     store.path = path != null ? new Path(path.toString()) : null;
-    store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
     store.options = (Options) (options != null ? options.clone() : null);
-    store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
     return store;
   }
   
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("\"CreateTable\": {\"table\": \""+tableName+"\",");
-    if (partitionKeys != null) {
-      sb.append("\"partition keys: [");
-      for (int i = 0; i < partitionKeys.length; i++) {
-        sb.append(partitionKeys[i]);
-        if (i < partitionKeys.length - 1)
-          sb.append(",");
-      }
-      sb.append("],");
-    }
     sb.append("\"schema: \"{" + this.schema).append("}");
     sb.append(",\"storeType\": \"" + this.storageType);
     sb.append(",\"path\" : \"" + this.path).append("\",");
     sb.append(",\"external\" : \"" + this.external).append("\",");
-    sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema())
@@ -174,11 +113,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
 
   @Override
   public void preOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);    
+    if (hasSubQuery()) {
+      child.preOrder(visitor);
+    }
+    visitor.visit(this);
   }
 
   @Override
   public void postOrder(LogicalNodeVisitor visitor) {
-    visitor.visit(this);    
+    visitor.visit(this);
+    if (hasSubQuery()) {
+      child.preOrder(visitor);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
index 21f5ef4..d687829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -82,7 +82,6 @@ public class PartitionedTableScanNode extends ScanNode {
       sb.append("\n");
     }
 
-	  sb.append(",");
 	  sb.append("\n  \"out schema\": ").append(getOutSchema());
 	  sb.append("\n  \"in schema\": ").append(getInSchema());
 	  return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
index 2f1a487..1bd3548 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -21,10 +21,11 @@ package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+
 
 /**
  * <code>PersistentStoreNode</code> an expression for a persistent data store step.
@@ -32,11 +33,18 @@ import org.apache.tajo.util.TUtil;
  */
 public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
   @Expose protected String tableName;
-  @Expose protected CatalogProtos.StoreType storageType = CatalogProtos.StoreType.CSV;
+  @Expose protected StoreType storageType = StoreType.CSV;
   @Expose protected Options options;
 
-  public PersistentStoreNode(int pid, String tableName) {
-    super(pid, NodeType.STORE);
+  public PersistentStoreNode(int pid, NodeType nodeType) {
+    super(pid, nodeType);
+  }
+
+  public boolean hasTargetTable() {
+    return tableName != null;
+  }
+
+  public void setTableName(String tableName) {
     this.tableName = tableName;
   }
 
@@ -44,11 +52,11 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
     return this.tableName;
   }
 
-  public void setStorageType(CatalogProtos.StoreType storageType) {
+  public void setStorageType(StoreType storageType) {
     this.storageType = storageType;
   }
 
-  public CatalogProtos.StoreType getStorageType() {
+  public StoreType getStorageType() {
     return this.storageType;
   }
 
@@ -60,6 +68,10 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
     return this.options;
   }
 
+  public void setOptions(Options options) {
+    this.options = options;
+  }
+
   @Override
   public PlanString getPlanString() {
     PlanString planStr = new PlanString("Store");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
index 180b1a2..3abec9d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -34,8 +34,8 @@ public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneab
   @Expose private int numOutputs;
   @Expose private Column [] shuffleKeys;
 
-  public ShuffleFileWriteNode(int pid, String tableName) {
-    super(pid, tableName);
+  public ShuffleFileWriteNode(int pid) {
+    super(pid, NodeType.STORE);
   }
     
   public final int getNumOutputs() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
index c3d7307..fb8ae3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.logical;
 
 public class StoreIndexNode extends StoreTableNode {
 
-  public StoreIndexNode(int pid, String tableName) {
-    super(pid, tableName);
+  public StoreIndexNode(int pid) {
+    super(pid);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 843a70f..2aca6a7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -19,45 +19,23 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-
 public class StoreTableNode extends PersistentStoreNode implements Cloneable {
-  @Expose private boolean isCreatedTable = false;
-  @Expose private boolean isOverwritten = false;
   @Expose private PartitionDesc partitionDesc;
 
-  public StoreTableNode(int pid, String tableName) {
-    super(pid, tableName);
-  }
-
-  public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
-    super(pid, tableName);
-    this.partitionDesc = partitionDesc;
-  }
-
-  public void setStorageType(StoreType storageType) {
-    this.storageType = storageType;
-  }
-
-  public StoreType getStorageType() {
-    return this.storageType;
-  }
-
-  public boolean hasOptions() {
-    return this.options != null;
+  public StoreTableNode(int pid) {
+    super(pid, NodeType.STORE);
   }
 
-  public void setOptions(Options options) {
-    this.options = options;
+  protected StoreTableNode(int pid, NodeType nodeType) {
+    super(pid, nodeType);
   }
 
-  public Options getOptions() {
-    return this.options;
+  public boolean hasPartition() {
+    return this.partitionDesc != null;
   }
 
   public PartitionDesc getPartitions() {
@@ -76,22 +54,12 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
 
     return planStr;
   }
-
-  public boolean isCreatedTable() {
-    return isCreatedTable;
-  }
-
-  public void setCreateTable() {
-    isCreatedTable = true;
-  }
   
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof StoreTableNode) {
       StoreTableNode other = (StoreTableNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && isCreatedTable == other.isCreatedTable;
-      eq = eq && isOverwritten == other.isOverwritten;
       eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {
@@ -102,8 +70,6 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     StoreTableNode store = (StoreTableNode) super.clone();
-    store.isCreatedTable = isCreatedTable;
-    store.isOverwritten = isOverwritten;
     store.partitionDesc = partitionDesc;
     return store;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 8817c41..0b06e9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.util.TUtil;
 
 
 public abstract class UnaryNode extends LogicalNode implements Cloneable {
@@ -43,7 +44,7 @@ public abstract class UnaryNode extends LogicalNode implements Cloneable {
   public boolean deepEquals(Object o) {
     if (o instanceof UnaryNode) {
       UnaryNode u = (UnaryNode) o;
-      return equals(o) && child.deepEquals(u.child);
+      return equals(o) && TUtil.checkEquals(child, u.child);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index c873b93..338d2f2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -34,6 +34,8 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.Appender;
@@ -83,21 +85,21 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     }
 
     // Find column index to name subpartition directory path
-    if (this.plan.getPartitions() != null) {
-      if (this.plan.getPartitions().getColumns() != null) {
-        partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
-        partitionColumnNames = new String[partitionColumnIndices.length];
-        Schema columnPartitionSchema = plan.getPartitions().getSchema();
-        for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++)  {
-          Column targetColumn = columnPartitionSchema.getColumn(i);
-          for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
-            Column inputColumn = plan.getInSchema().getColumn(j);
-            if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
-              partitionColumnIndices[i] = j;
-              partitionColumnNames[i] = targetColumn.getColumnName();
-            }
-          }
-        }
+    int partitionKeyNum = this.plan.getPartitions().getColumns().size();
+    partitionColumnIndices = new int[partitionKeyNum];
+    partitionColumnNames = new String[partitionKeyNum];
+    for (int i = 0; i < partitionKeyNum; i++) {
+      Column column = this.plan.getPartitions().getColumns().get(i);
+      partitionColumnNames[i] = column.getColumnName();
+
+      if (this.plan.getType() == NodeType.INSERT) {
+        InsertNode insertNode = ((InsertNode)plan);
+        int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+        partitionColumnIndices[i] = idx;
+      } else {
+        // We can get partition column from a logical schema.
+        // Don't use output schema because it is rewritten.
+        partitionColumnIndices[i] = plan.getOutSchema().getColumnId(column.getQualifiedName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index affdc86..f097d0c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -23,6 +23,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
@@ -54,8 +55,14 @@ public class StoreTableExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
-    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
-        context.getOutputPath());
+    if (plan instanceof InsertNode) {
+      InsertNode createTableNode = (InsertNode) plan;
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+          createTableNode.getTableSchema(), context.getOutputPath());
+    } else {
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
+          context.getOutputPath());
+    }
 
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 86599f3..6264961 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -61,14 +61,7 @@ public class ProjectionPushDownRule extends
   public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
     LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
 
-    LogicalPlan.QueryBlock topmostBlock;
-
-    // skip a non-table-expression block.
-    if (plan.getRootBlock().getRootType() == NodeType.INSERT) {
-      topmostBlock = plan.getChildBlocks(rootBlock).get(0);
-    } else {
-      topmostBlock = rootBlock;
-    }
+    LogicalPlan.QueryBlock topmostBlock = rootBlock;
 
     Stack<LogicalNode> stack = new Stack<LogicalNode>();
     Context context = new Context(plan);
@@ -98,6 +91,11 @@ public class ProjectionPushDownRule extends
     }
 
     private String add(String name, EvalNode evalNode) throws PlanningException {
+      if (evalNode.getType() == EvalType.CONST) {
+        nameToEvalMap.put(name, evalNode);
+        resolvedFlags.put(name, false);
+        return name;
+      }
       if (evalToNameMap.containsKey(evalNode)) {
         name = evalToNameMap.get(evalNode);
       } else {
@@ -178,6 +176,9 @@ public class ProjectionPushDownRule extends
 
     public void resolve(Target target) {
       EvalNode evalNode = target.getEvalTree();
+      if (evalNode.getType() == EvalType.CONST) { // if constant value
+        return; // keep it raw always
+      }
       if (!evalToNameMap.containsKey(evalNode)) {
         throw new RuntimeException("No such eval: " + evalNode);
       }
@@ -327,7 +328,13 @@ public class ProjectionPushDownRule extends
       case INSERT:
         InsertNode insertNode = (InsertNode) parentNode;
         insertNode.setSubQuery(child);
-
+        break;
+      case CREATE_TABLE:
+        CreateTableNode createTableNode = (CreateTableNode) parentNode;
+        createTableNode.setChild(child);
+        createTableNode.setInSchema(child.getOutSchema());
+        createTableNode.setInSchema(child.getOutSchema());
+        break;
       default:
         throw new PlanningException("Unexpected Parent Node: " + parentNode.getType());
       }
@@ -743,4 +750,13 @@ public class ProjectionPushDownRule extends
 
     return node;
   }
+
+  @Override
+  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return node;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c40810..cfe3b61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -33,12 +33,8 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
 import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.exception.IllegalQueryStatusException;
 import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.parser.HiveConverter;
@@ -56,6 +52,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Stack;
 
 import static org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
 
@@ -69,9 +66,10 @@ public class GlobalEngine extends AbstractService {
   private SQLAnalyzer analyzer;
   private HiveConverter converter;
   private CatalogService catalog;
+  private PreLogicalPlanVerifier preVerifier;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private LogicalPlanVerifier verifier;
+  private LogicalPlanVerifier annotatedPlanVerifier;
   private DistributedQueryHookManager hookManager;
 
   public GlobalEngine(final MasterContext context) {
@@ -85,9 +83,10 @@ public class GlobalEngine extends AbstractService {
     try  {
       analyzer = new SQLAnalyzer();
       converter = new HiveConverter();
+      preVerifier = new PreLogicalPlanVerifier();
       planner = new LogicalPlanner(context.getCatalog());
       optimizer = new LogicalOptimizer(context.getConf());
-      verifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
+      annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
 
       hookManager = new DistributedQueryHookManager();
       hookManager.addHook(new CreateTableHook());
@@ -134,7 +133,6 @@ public class GlobalEngine extends AbstractService {
       context.getSystemMetrics().counter("Query", "totalQuery").inc();
 
       Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
-
       LogicalPlan plan = createLogicalPlan(planningContext);
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
@@ -234,14 +232,23 @@ public class GlobalEngine extends AbstractService {
 
   private LogicalPlan createLogicalPlan(Expr expression) throws PlanningException {
 
+    VerificationState state = new VerificationState();
+    preVerifier.visit(state, new Stack<Expr>(), expression);
+    if (!state.verified()) {
+      StringBuilder sb = new StringBuilder();
+      for (String error : state.getErrorMessages()) {
+        sb.append(error).append("\n");
+      }
+      throw new VerifyException(sb.toString());
+    }
+
     LogicalPlan plan = planner.createPlan(expression);
     optimizer.optimize(plan);
     if (LOG.isDebugEnabled()) {
       LOG.debug("LogicalPlan:\n" + plan.getRootBlock().getRoot());
     }
 
-    VerificationState state = new VerificationState();
-    verifier.visit(state, plan, plan.getRootBlock());
+    annotatedPlanVerifier.visit(state, plan, plan.getRootBlock());
 
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
@@ -371,23 +378,18 @@ public class GlobalEngine extends AbstractService {
     @Override
     public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-      if (rootNode.getChild().getType() == NodeType.STORE) {
-        StoreTableNode storeTableNode = rootNode.getChild();
-        return storeTableNode.isCreatedTable();
-      } else {
-        return false;
-      }
+      return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
     }
 
     @Override
     public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-      StoreTableNode storeTableNode = rootNode.getChild();
-      String tableName = storeTableNode.getTableName();
+      CreateTableNode createTableNode = rootNode.getChild();
+      String tableName = createTableNode.getTableName();
       queryContext.setOutputTable(tableName);
       queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
-      if(storeTableNode.getPartitions() != null) {
-        queryContext.setPartitions(storeTableNode.getPartitions());
+      if(createTableNode.getPartitions() != null) {
+        queryContext.setPartitions(createTableNode.getPartitions());
       }
       queryContext.setCreateTable();
     }
@@ -405,122 +407,24 @@ public class GlobalEngine extends AbstractService {
       queryContext.setInsert();
 
       InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
-      StoreTableNode storeNode;
 
       // Set QueryContext settings, such as output table name and output path.
       // It also remove data files if overwrite is true.
       String outputTableName;
       Path outputPath;
-      CatalogProtos.StoreType storeType;
-      Options options = new Options();
       if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
-        TableDesc desc = insertNode.getTargetTable();
-        outputTableName = desc.getName();
-        outputPath = desc.getPath();
-        queryContext.setOutputTable(outputTableName);
-
-        // set default values
-        options.putAll(desc.getMeta().getOptions());
-        storeType = desc.getMeta().getStoreType();
+        queryContext.setOutputTable(insertNode.getTableName());
+        queryContext.setOutputPath(insertNode.getPath());
       } else { // INSERT INTO LOCATION ...
-        outputTableName = PlannerUtil.normalizeTableName(insertNode.getPath().getName());
+        // When INSERT INTO LOCATION, must not set output table.
         outputPath = insertNode.getPath();
         queryContext.setFileOutput();
-
-        // set default values
-        options = new Options();
-        storeType = CatalogProtos.StoreType.CSV;
+        queryContext.setOutputPath(outputPath);
       }
 
-      // overwrite the store type if store type is specified in the query statement
-      if (insertNode.hasStorageType()) {
-        storeType = insertNode.getStorageType();
-      }
-
-      // overwrite the table properties if they are specified in the query statement
-      if (insertNode.hasOptions()) {
-        options.putAll(insertNode.getOptions());
-      }
-
-      storeNode = new StoreTableNode(plan.newPID(), outputTableName);
-      storeNode.setStorageType(storeType);
-      storeNode.setOptions(options);
-
-      // set OutputPath
-      queryContext.setOutputPath(outputPath);
-
       if (insertNode.isOverwrite()) {
         queryContext.setOutputOverwrite();
       }
-
-      ////////////////////////////////////////////////////////////////////////////////////
-      //             [TARGET TABLE]  [TARGET COLUMN]         [SUBQUERY Schema]           /
-      // INSERT INTO    TB_NAME      (col1, col2)     SELECT    c1,   c2        FROM ... /
-      ////////////////////////////////////////////////////////////////////////////////////
-      LogicalNode subQuery = insertNode.getSubQuery();
-      Schema subQueryOutSchema = subQuery.getOutSchema();
-
-      if (insertNode.hasTargetTable()) { // if a target table is given, it computes the proper schema.
-        storeNode.getOptions().putAll(insertNode.getTargetTable().getMeta().toMap());
-
-        Schema targetTableSchema = insertNode.getTargetTable().getSchema();
-        Schema targetProjectedSchema = insertNode.getTargetSchema();
-
-        int [] targetColumnIds = new int[targetProjectedSchema.getColumnNum()];
-        int idx = 0;
-        for (Column column : targetProjectedSchema.getColumns()) {
-          targetColumnIds[idx++] = targetTableSchema.getColumnId(column.getQualifiedName());
-        }
-
-        Target [] targets = new Target[targetTableSchema.getColumnNum()];
-        boolean matched = false;
-        for (int i = 0; i < targetTableSchema.getColumnNum(); i++) {
-          Column column = targetTableSchema.getColumn(i);
-          for (int j = 0; j < targetColumnIds.length; j++) {
-            if (targetColumnIds[j] == i) {
-              Column outputColumn = subQueryOutSchema.getColumn(j);
-              targets[i] = new Target(new FieldEval(outputColumn), column.getColumnName());
-              matched = true;
-              break;
-            }
-          }
-          if (!matched) {
-            targets[i] = new Target(new ConstEval(NullDatum.get()), column.getColumnName());
-          }
-          matched = false;
-        }
-
-
-        ProjectionNode projectionNode = new ProjectionNode(plan.newPID());
-        projectionNode.setTargets(targets);
-        projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
-        List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());
-        projectionNode.setChild(blocks.get(0).getRoot());
-
-        storeNode.setOutSchema(projectionNode.getOutSchema());
-        storeNode.setInSchema(projectionNode.getOutSchema());
-        storeNode.setChild(projectionNode);
-      } else {
-        storeNode.setOutSchema(subQueryOutSchema);
-        storeNode.setInSchema(subQueryOutSchema);
-        List<LogicalPlan.QueryBlock> childBlocks = plan.getChildBlocks(plan.getRootBlock());
-        storeNode.setChild(childBlocks.get(0).getRoot());
-      }
-
-      // If InsertNode contains table partition information, StoreNode must has it.
-      if (insertNode.hasTargetTable()) {
-        if (insertNode.getTargetTable().getPartitions() != null) {
-          storeNode.setPartitions(insertNode.getTargetTable().getPartitions());
-        }
-      }
-
-      // find a subquery query of insert node and merge root block and subquery into one query block.
-      PlannerUtil.replaceNode(plan.getRootBlock().getRoot(), storeNode, NodeType.INSERT);
-      plan.getRootBlock().refresh();
-      LogicalPlan.QueryBlock subBlock = plan.getBlock(insertNode.getSubQuery());
-      // remove the sub block and connection from a block graph.
-      plan.removeBlock(subBlock);
-      plan.getQueryBlockGraph().removeEdge(subBlock.getName(), LogicalPlan.ROOT_BLOCK);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 22c3a35..7e4540b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -35,12 +35,15 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
@@ -327,7 +330,7 @@ public class Query implements EventHandler<QueryEvent> {
 
             if (queryContext.hasOutputTable()) { // TRUE only if a query command is 'CREATE TABLE' OR 'INSERT INTO'
               if (queryContext.isOutputOverwrite()) { // TRUE only if a query is 'INSERT OVERWRITE INTO'
-                catalog.deleteTable(finalOutputDir.getName());
+                catalog.deleteTable(finalTableDesc.getName());
               }
               catalog.addTable(finalTableDesc);
             }
@@ -378,12 +381,25 @@ public class Query implements EventHandler<QueryEvent> {
                                                   Path finalOutputDir) {
       // Determine the output table name
       SubQuery subQuery = query.getSubQuery(finalExecBlockId);
-      QueryContext queryContext = query.context.getQueryContext();
+
       String outputTableName;
-      if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
-        outputTableName = queryContext.getOutputTable();
-      } else { // SELECT STATEMENT
-        outputTableName = query.getId().toString();
+      PartitionDesc partitionDesc = null;
+      QueryContext queryContext = query.context.getQueryContext();
+      if (subQuery.getBlock().getPlan().getType() == NodeType.CREATE_TABLE) {
+        CreateTableNode createTableNode = (CreateTableNode) subQuery.getBlock().getPlan();
+        outputTableName = createTableNode.getTableName();
+        if (createTableNode.hasPartition()) {
+          partitionDesc = createTableNode.getPartitions();
+        }
+      } else {
+        if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+          outputTableName = queryContext.getOutputTable();
+        } else { // SELECT STATEMENT
+          outputTableName = query.getId().toString();
+        }
+        if(queryContext.hasPartitions()) {
+          partitionDesc = queryContext.getPartitions();
+        }
       }
 
       TableMeta meta = subQuery.getTableMeta();
@@ -410,8 +426,8 @@ public class Query implements EventHandler<QueryEvent> {
         }
       }
 
-      if(queryContext.hasPartitions()) {
-        finalTableDesc.setPartitions(queryContext.getPartitions());
+      if (partitionDesc != null) {
+        finalTableDesc.setPartitions(partitionDesc);
       }
 
       return finalTableDesc;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index cd93a8f..6758189 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -424,7 +424,7 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.ROOT, plan.getType());
     LogicalRootNode root = (LogicalRootNode) plan;
 
-    assertEquals(NodeType.STORE, root.getChild().getType());
+    assertEquals(NodeType.CREATE_TABLE, root.getChild().getType());
     StoreTableNode storeNode = root.getChild();
     testQuery7(storeNode.getChild());
   }
@@ -776,31 +776,31 @@ public class TestLogicalPlanner {
   public final void testInsertInto0() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[0]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertFalse(insertNode.isOverwrite());
     assertTrue(insertNode.hasTargetTable());
-    assertEquals("score", insertNode.getTargetTable().getName());
+    assertEquals("score", insertNode.getTableName());
   }
 
   @Test
   public final void testInsertInto1() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[1]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertFalse(insertNode.isOverwrite());
-    assertEquals("score", insertNode.getTargetTable().getName());
+    assertEquals("score", insertNode.getTableName());
   }
 
   @Test
   public final void testInsertInto2() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[2]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertFalse(insertNode.isOverwrite());
-    assertEquals("employee", insertNode.getTargetTable().getName());
+    assertEquals("employee", insertNode.getTableName());
     assertTrue(insertNode.hasTargetSchema());
     assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
     assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
@@ -810,7 +810,7 @@ public class TestLogicalPlanner {
   public final void testInsertInto3() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[3]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertFalse(insertNode.isOverwrite());
     assertTrue(insertNode.hasPath());
@@ -820,11 +820,11 @@ public class TestLogicalPlanner {
   public final void testInsertInto4() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[4]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertTrue(insertNode.isOverwrite());
     assertTrue(insertNode.hasTargetTable());
-    assertEquals("employee", insertNode.getTargetTable().getName());
+    assertEquals("employee", insertNode.getTableName());
     assertTrue(insertNode.hasTargetSchema());
     assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
     assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
@@ -834,7 +834,7 @@ public class TestLogicalPlanner {
   public final void testInsertInto5() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(insertStatements[5]);
     LogicalPlan plan = planner.createPlan(expr);
-    assertEquals(2, plan.getQueryBlocks().size());
+    assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertTrue(insertNode.isOverwrite());
     assertTrue(insertNode.hasPath());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index b770de5..8b2db9f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -94,13 +94,13 @@ public class TestInsertQuery {
     assertTrue(catalog.existsTable(tableName));
     TableDesc originalDesc = catalog.getTableDesc(tableName);
 
-    res = tpch.execute("insert overwrite into " + tableName
-        + " (col1, col3) select l_orderkey, l_quantity from lineitem");
+    res = tpch.execute("insert overwrite into " + tableName + " (col1, col3) select l_orderkey, l_quantity from lineitem");
     res.close();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertEquals(5, desc.getStats().getNumRows().intValue());
 
     res = tpch.execute("select * from " + tableName);
+
     assertTrue(res.next());
     assertEquals(1, res.getLong(1));
     assertTrue(0f == res.getFloat(2));