You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/04 17:51:14 UTC

[2/3] TAJO-144: Implement INSERT OVERWRITE clause. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
new file mode 100644
index 0000000..6655317
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
+
+public class QueryMeta extends Options {
+
+  public QueryMeta() {}
+
+  public QueryMeta(KeyValueSetProto proto) {
+    super(proto);
+  }
+
+  public void put(TajoConf.ConfVars key, String value) {
+    put(key.varname, value);
+  }
+
+  public String get(TajoConf.ConfVars key) {
+    return get(key.varname);
+  }
+
+  public String get(String key) {
+    return super.get(key);
+  }
+
+  public void setUser(String username) {
+    put(TajoConf.ConfVars.QUERY_USERNAME, username);
+  }
+
+  public String getUser() {
+    return get(TajoConf.ConfVars.QUERY_USERNAME);
+  }
+
+  public void setOutputTable(String tableName) {
+    put(TajoConf.ConfVars.QUERY_OUTPUT_TABLE, PlannerUtil.normalizeTableName(tableName));
+  }
+
+  public String getOutputTable() {
+    return get(TajoConf.ConfVars.QUERY_OUTPUT_TABLE);
+  }
+
+  public void setOutputPath(Path path) {
+    // it is determined in QueryMaster.initStagingDir().
+    put(TajoConf.ConfVars.QUERY_OUTPUT_DIR, path.toUri().toString());
+  }
+
+  public Path getOutputPath() {
+    return new Path(get(TajoConf.ConfVars.QUERY_OUTPUT_DIR));
+  }
+
+  public void setOutputOverwrite() {
+    put("tajo.output.overwrite", "1");
+  }
+
+  public boolean isOutputOverwrite() {
+    String overwrite = get("tajo.output.overwrite");
+    if (overwrite != null && overwrite.equals("1")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void setFileOutput() {
+    put("tajo.output.fileoutput", "1");
+  }
+
+  public boolean isFileOutput() {
+    String fileoutput = get("tajo.output.fileoutput");
+    if (fileoutput != null && fileoutput.equals("1")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void setCreateTable() {
+    put("tajo.query.command", CreateTableNode.class.getSimpleName());
+  }
+
+  public boolean isCreateTable() {
+    String command = get("tajo.query.command");
+    return command != null && command.equals(CreateTableNode.class.getSimpleName());
+  }
+
+  public void setInsert() {
+    put("tajo.query.command", InsertNode.class.getSimpleName());
+  }
+
+  public boolean isInsert() {
+    String command = get("tajo.query.command");
+    return command != null && command.equals(InsertNode.class.getSimpleName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 9b454f6..b26d05b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -118,7 +118,6 @@ public class TaskSchedulerImpl extends AbstractService
 
           schedule();
         }
-        //req.getCallback().run(stopTaskRunnerReq);
         LOG.info("TaskScheduler schedulingThread stopped");
       }
     };
@@ -459,7 +458,8 @@ public class TaskSchedulerImpl extends AbstractService
               new ArrayList<Fragment>(task.getAllFragments()),
               task.getOutputName(),
               false,
-              task.getLogicalPlan().toJson());
+              task.getLogicalPlan().toJson(),
+              context.getQueryMeta());
           if (task.getStoreTableNode().isLocal()) {
             taskAssign.setInterQuery();
           }
@@ -503,7 +503,8 @@ public class TaskSchedulerImpl extends AbstractService
               Lists.newArrayList(task.getAllFragments()),
               task.getOutputName(),
               false,
-              task.getLogicalPlan().toJson());
+              task.getLogicalPlan().toJson(),
+              context.getQueryMeta());
           if (task.getStoreTableNode().isLocal()) {
             taskAssign.setInterQuery();
           }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 6ae8ff7..54cb0fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -20,18 +20,24 @@ package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.master.QueryMeta;
 
+/**
+ * This event is conveyed to QueryMaster.
+ */
 public class QueryStartEvent extends AbstractEvent {
   public enum EventType {
     QUERY_START
   }
 
   private QueryId queryId;
+  private QueryMeta queryMeta;
   private String logicalPlanJson;
 
-  public QueryStartEvent(QueryId queryId, String logicalPlanJson) {
+  public QueryStartEvent(QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
     super(EventType.QUERY_START);
     this.queryId = queryId;
+    this.queryMeta = queryMeta;
     this.logicalPlanJson = logicalPlanJson;
   }
 
@@ -39,6 +45,10 @@ public class QueryStartEvent extends AbstractEvent {
     return queryId;
   }
 
+  public QueryMeta getQueryMeta() {
+    return this.queryMeta;
+  }
+
   public String getLogicalPlanJson() {
     return logicalPlanJson;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 99b7c62..d1e92be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -29,9 +29,13 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableDescImpl;
+import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.master.ExecutionBlock;
 import org.apache.tajo.master.ExecutionBlockCursor;
 import org.apache.tajo.master.event.*;
@@ -57,7 +61,7 @@ public class Query implements EventHandler<QueryEvent> {
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   private final StorageManager sm;
-  private QueryMasterTask.QueryContext context;
+  QueryMasterTask.QueryContext context;
   private ExecutionBlockCursor cursor;
 
   // Query Status
@@ -305,19 +309,29 @@ public class Query implements EventHandler<QueryEvent> {
         } else { // Finish a query
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
             SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
-            TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
-                subQuery.getTableMeta(), query.context.getOutputPath());
-            query.setResultDesc(desc);
-            try {
-              query.writeStat(query.context.getOutputPath(), subQuery);
-            } catch (IOException e) {
-              e.printStackTrace();
+            TableDesc outputTableDesc = new TableDescImpl(query.context.getQueryMeta().getOutputTable(),
+                subQuery.getTableMeta(), query.context.getQueryMeta().getOutputPath());
+            query.setResultDesc(outputTableDesc);
+
+            if (!query.context.getQueryMeta().isFileOutput()) {
+              try {
+                query.writeStat(query.context.getQueryMeta().getOutputPath(), subQuery);
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
             }
             query.eventHandler.handle(new QueryFinishEvent(query.getId()));
 
-            if (query.context.isCreateTableQuery()) {
-              // TOOD move to QueryJobManager
-              //query.context.getCatalog().addTable(desc);
+            StoreTableNode storeTableNode = (StoreTableNode) PlannerUtil.findTopNode(subQuery.getBlock().getPlan(),
+                NodeType.STORE);
+            if (storeTableNode.isCreatedTable()) {
+              query.context.getQueryMasterContext().getWorkerContext().getCatalog().addTable(outputTableDesc);
+            } else if (storeTableNode.isOverwrite() && !query.context.getQueryMeta().isFileOutput()) {
+              CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+              TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
+              updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
+              catalog.deleteTable(outputTableDesc.getName());
+              catalog.addTable(updatingTable);
             }
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 10be0a3..f6cf02e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,6 +29,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.QueryMeta;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -45,6 +46,8 @@ public class QueryInProgress extends CompositeService {
 
   private QueryId queryId;
 
+  private QueryMeta queryMeta;
+
   private TajoAsyncDispatcher dispatcher;
 
   private LogicalRootNode plan;
@@ -63,9 +66,11 @@ public class QueryInProgress extends CompositeService {
 
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
+      QueryMeta queryMeta,
       QueryId queryId, String sql, LogicalRootNode plan) {
     super(QueryInProgress.class.getName());
     this.masterContext = masterContext;
+    this.queryMeta = queryMeta;
     this.queryId = queryId;
     this.plan = plan;
 
@@ -195,12 +200,13 @@ public class QueryInProgress extends CompositeService {
         //TODO wait
         return;
       }
-      LOG.info("====>Call executeQuery to :" +
+      LOG.info("Call executeQuery to :" +
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
       queryMasterRpcClient.executeQuery(
           null,
           TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
               .setQueryId(queryId.getProto())
+              .setQueryMeta(queryMeta.getProto())
               .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
               .build(), NullCallback.get());
       querySubmitted.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 16008fc..fb92616 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.QueryMeta;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 
@@ -84,9 +85,9 @@ public class QueryJobManager extends CompositeService {
     return dispatcher.getEventHandler();
   }
 
-  public QueryInfo createNewQueryJob(String sql, LogicalRootNode plan) throws Exception {
+  public QueryInfo createNewQueryJob(QueryMeta queryMeta, String sql, LogicalRootNode plan) throws Exception {
     QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryId, sql, plan);
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext,queryMeta, queryId, sql, plan);
 
     synchronized(runningQueries) {
       runningQueries.put(queryId, queryInProgress);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 50ec5be..53b9c05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -60,7 +60,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private GlobalOptimizer globalOptimizer;
 
-//  private boolean isCreateTableStmt;
   private StorageManager storageManager;
 
   private QueryConf queryConf;
@@ -88,7 +87,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       queryConf = new QueryConf(conf);
       queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
 
-      QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+      QUERY_SESSION_TIMEOUT = 60 * 1000;
       queryMasterContext = new QueryMasterContext(queryConf);
 
       clock = new SystemClock();
@@ -112,7 +111,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   @Override
   public void start() {
-    LOG.info("====>QueryMaster start");
+    LOG.info("QueryMaster start");
 
     queryHeartbeatThread = new QueryHeartbeatThread();
     queryHeartbeatThread.start();
@@ -253,10 +252,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
   private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
     @Override
     public void handle(QueryStartEvent event) {
-      LOG.info("====>Start QueryStartEventHandler:" + event.getQueryId());
+      LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       //To change body of implemented methods use File | Settings | File Templates.
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getLogicalPlanJson());
+          event.getQueryId(), event.getQueryMeta(), event.getLogicalPlanJson());
 
       queryMasterTask.init(queryConf);
       queryMasterTask.start();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index cfea8a2..f348e44 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -39,6 +39,7 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.master.QueryMeta;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -64,6 +65,8 @@ public class QueryMasterTask extends CompositeService {
 
   private QueryId queryId;
 
+  private QueryMeta queryMeta;
+
   private QueryContext queryContext;
 
   private QueryMaster.QueryMasterContext queryMasterContext;
@@ -78,8 +81,6 @@ public class QueryMasterTask extends CompositeService {
 
   private final long querySubmitTime;
 
-  private boolean isCreateTableStmt;
-
   private Path outputPath;
 
   private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
@@ -93,10 +94,11 @@ public class QueryMasterTask extends CompositeService {
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, String logicalPlanJson) {
+                         QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
     this.queryId = queryId;
+    this.queryMeta = queryMeta;
     this.logicalPlanJson = logicalPlanJson;
     this.querySubmitTime = System.currentTimeMillis();
   }
@@ -269,13 +271,12 @@ public class QueryMasterTask extends CompositeService {
     realUser = ugi.getShortUserName();
     currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
 
-    String givenOutputTableName = queryConf.getOutputTable();
+    String givenOutputTableName = queryMeta.getOutputTable();
     Path stagingDir;
 
     // If final output directory is not given by an user,
     // we use the query id as a output directory.
     if (givenOutputTableName == null || givenOutputTableName.isEmpty()) {
-      this.isCreateTableStmt = false;
       FileSystem defaultFS = FileSystem.get(queryConf);
 
       Path homeDirectory = defaultFS.getHomeDirectory();
@@ -318,25 +319,31 @@ public class QueryMasterTask extends CompositeService {
       }
 
       // Set the query id to the output table name
-      queryConf.setOutputTable(queryId.toString());
+      queryMeta.setOutputTable(queryId.toString());
+
+    } else { // if a output table is given
 
-    } else {
-      this.isCreateTableStmt = true;
       Path warehouseDir = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR),
           TajoConstants.WAREHOUSE_DIR);
-      stagingDir = new Path(warehouseDir, queryConf.getOutputTable());
-
       FileSystem fs = warehouseDir.getFileSystem(queryConf);
-      if (fs.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + " already exists. The directory must be unique to each query");
+
+      if (queryMeta.isFileOutput()) {
+        stagingDir = queryMeta.getOutputPath();
       } else {
-        // TODO - should have appropriate permission
-        fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+        stagingDir = new Path(warehouseDir, queryMeta.getOutputTable());
       }
+
+      if (!queryMeta.isOutputOverwrite()) {
+        if (fs.exists(stagingDir)) {
+          throw new IOException("The staging directory " + stagingDir
+              + " already exists. The directory must be unique to each query");
+        }
+      }
+
+      fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
     }
 
-    queryConf.setOutputPath(stagingDir);
+    queryMeta.setOutputPath(stagingDir);
     outputPath = stagingDir;
     LOG.info("Initialized Query Staging Dir: " + outputPath);
   }
@@ -383,6 +390,10 @@ public class QueryMasterTask extends CompositeService {
       return queryMasterContext;
     }
 
+    public QueryMeta getQueryMeta() {
+      return queryMeta;
+    }
+
     public QueryConf getConf() {
       return queryConf;
     }
@@ -407,10 +418,6 @@ public class QueryMasterTask extends CompositeService {
       return outputPath;
     }
 
-    public boolean isCreateTableQuery() {
-      return isCreateTableStmt;
-    }
-
     public synchronized EventHandler getEventHandler() {
       if(eventHandler == null) {
         eventHandler = dispatcher.getEventHandler();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 89d3fed..ecde59c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -46,11 +46,8 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.TaskScheduler;
-import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageManager;
@@ -72,6 +69,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
+  private QueryMeta queryMeta;
   private ExecutionBlock block;
   private int priority;
   private TableMeta meta;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index e6ec9c0..1e9d6e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -189,7 +189,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
 
       if(!fs.exists(queryConfPath)){
-        LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConf.getOutputPath());
+        LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConfPath);
         writeConf(queryConf, queryConfPath);
       } else {
         LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index bb27ee4..0fc896b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -21,13 +21,14 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryMaster;
@@ -36,11 +37,13 @@ import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.webapp.StaticHttpServer;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -64,11 +67,15 @@ public class TajoWorker extends CompositeService {
 
   private TajoWorkerManagerService tajoWorkerManagerService;
 
+  private InetSocketAddress tajoMasterAddress;
+
   //to TajoMaster
   private ProtoAsyncRpcClient tajoMasterRpc;
 
   private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
 
+  private CatalogClient catalogClient;
+
   private WorkerContext workerContext;
 
   private TaskRunnerManager taskRunnerManager;
@@ -171,17 +178,14 @@ public class TajoWorker extends CompositeService {
       workerHeartbeatThread.interrupt();
     }
 
-//    try {
-//      FileSystem.closeAll();
-//    } catch (IOException e) {
-//      LOG.error(e.getMessage(), e);
-//    }
+    if (catalogClient != null) {
+      catalogClient.close();
+    }
+
     if(tajoMasterRpc != null) {
       tajoMasterRpc.close();
     }
-//    for(Service eachService: getServices()) {
-//      System.out.println("Service:" + eachService);
-//    }
+
     super.stop();
     LOG.info("TajoWorker main thread exiting");
   }
@@ -207,6 +211,10 @@ public class TajoWorker extends CompositeService {
       return taskRunnerManager;
     }
 
+    public CatalogService getCatalog() {
+      return catalogClient;
+    }
+
     public TajoPullServerService getPullService() {
       return pullService;
     }
@@ -246,9 +254,10 @@ public class TajoWorker extends CompositeService {
   private void setWorkerMode(String[] params) {
     if("qm".equals(daemonMode)) {
       //QueryMaster mode
-      String tajoMasterAddress = params[2];
 
+      String tajoMasterAddress = params[2];
       connectToTajoMaster(tajoMasterAddress);
+      connectToCatalog();
 
       QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
       tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
@@ -259,21 +268,24 @@ public class TajoWorker extends CompositeService {
     } else {
       //Standby mode
       connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
+      connectToCatalog();
       workerHeartbeatThread = new WorkerHeartbeatThread();
       workerHeartbeatThread.start();
     }
   }
 
-  private void connectToTajoMaster(String tajoMasterAddress) {
-    LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
-    InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+  private void connectToTajoMaster(String tajoMasterAddrString) {
+    LOG.info("Connecting to TajoMaster (" + tajoMasterAddrString +")");
+    this.tajoMasterAddress = NetUtils.createSocketAddr(tajoMasterAddrString);
+
     while(true) {
       try {
-        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
+        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
         tajoMasterRpcClient = tajoMasterRpc.getStub();
         break;
       } catch (Exception e) {
-        LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
+        LOG.error("Can't connect to TajoMaster[" + NetUtils.normalizeInetSocketAddress(tajoMasterAddress) + "], "
+            + e.getMessage(), e);
       }
 
       try {
@@ -283,6 +295,17 @@ public class TajoWorker extends CompositeService {
     }
   }
 
+  private void connectToCatalog() {
+    // TODO: To be improved. it's a hack. It assumes that CatalogServer is embedded in TajoMaster.
+    String hostName = this.tajoMasterAddress.getHostName();
+    int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
+    try {
+      catalogClient = new CatalogClient(hostName, port);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
   class WorkerHeartbeatThread extends Thread {
     TajoMasterProtocol.ServerStatusProto.System systemInfo;
     List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 3969b36..e1fd88a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -100,11 +100,11 @@ public class TajoWorkerClientService extends AbstractService {
 
   @Override
   public void stop() {
-    LOG.info("====> TajoWorkerClientService stopping");
+    LOG.info("TajoWorkerClientService stopping");
     if(rpcServer != null) {
       rpcServer.shutdown();
     }
-    LOG.info("====> TajoWorkerClientService stopped");
+    LOG.info("TajoWorkerClientService stopped");
     super.stop();
   }
 
@@ -174,7 +174,10 @@ public class TajoWorkerClientService extends AbstractService {
           builder.setProgress(query.getProgress());
           builder.setSubmitTime(query.getAppSubmitTime());
           builder.setInitTime(query.getInitializationTime());
-          builder.setHasResult(!queryMasterTask.getQueryContext().isCreateTableQuery());
+          builder.setHasResult(
+              !(queryMasterTask.getQueryContext().getQueryMeta().isCreateTable() ||
+                  queryMasterTask.getQueryContext().getQueryMeta().isInsert())
+          );
           if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
             builder.setFinishTime(query.getFinishTime());
           } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index a48339a..6fde6e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,6 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.QueryMeta;
 import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.querymaster.QueryMaster;
@@ -199,8 +200,9 @@ public class TajoWorkerManagerService extends CompositeService
                            RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
       QueryId queryId = new QueryId(request.getQueryId());
-      LOG.info("====>Receive executeQuery request:" + queryId);
-      queryMaster.handle(new QueryStartEvent(queryId, request.getLogicalPlanJson().getValue()));
+      LOG.info("Receive executeQuery request:" + queryId);
+      queryMaster.handle(new QueryStartEvent(queryId,
+          new QueryMeta(request.getQueryMeta()), request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index f9cc82c..aef5ead 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -48,6 +48,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.master.QueryMeta;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
@@ -69,6 +70,7 @@ public class Task {
   private static final Log LOG = LogFactory.getLog(Task.class);
 
   private final QueryConf conf;
+  private final QueryMeta queryMeta;
   private final FileSystem localFS;
   private final TaskRunner.TaskRunnerContext taskRunnerContext;
   private final Interface masterProxy;
@@ -136,6 +138,7 @@ public class Task {
 
     this.taskId = request.getId();
     this.conf = worker.getQueryConf();
+    this.queryMeta = request.getQueryMeta();
     this.taskRunnerContext = worker;
     this.masterProxy = masterProxy;
     this.localFS = worker.getLocalFS();
@@ -160,7 +163,7 @@ public class Task {
     } else {
       // The final result of a task will be written in a file named part-ss-nnnnnnn,
       // where ss is the subquery id associated with this task, and nnnnnn is the task id.
-      Path outFilePath = new Path(conf.getOutputPath(),
+      Path outFilePath = new Path(queryMeta.getOutputPath(),
           OUTPUT_FILE_PREFIX +
           OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
           OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 8493be2..5802ade 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -39,7 +39,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -106,20 +105,6 @@ public class TaskRunner extends AbstractService {
 
   private TaskRunnerManager taskRunnerManager;
 
-  public TaskRunner(
-      final ExecutionBlockId executionBlockId,
-      final NodeId nodeId,
-      UserGroupInformation taskOwner,
-      Interface master, ContainerId containerId) {
-    super(TaskRunner.class.getName());
-    this.executionBlockId = executionBlockId;
-    this.queryId = executionBlockId.getQueryId();
-    this.nodeId = nodeId;
-    this.taskOwner = taskOwner;
-    this.master = master;
-    this.containerId = containerId;
-  }
-
   public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
     super(TaskRunner.class.getName());
 
@@ -127,10 +112,7 @@ public class TaskRunner extends AbstractService {
     try {
       final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
 
-      conf.setOutputPath(new Path(args[6]));
-
       LOG.info("NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
-      LOG.info("OUTPUT DIR: " + conf.getOutputPath());
       LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
 
       UserGroupInformation.setConfiguration(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 88a2029..0694b4e 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -59,6 +59,7 @@ message QueryUnitRequestProto {
     optional bool interQuery = 6 [default = false];
     repeated Fetch fetches = 7;
     optional bool shouldDie = 8;
+    optional KeyValueSetProto queryMeta = 9;
 }
 
 message Fetch {
@@ -104,7 +105,8 @@ message Partition {
 
 message QueryExecutionRequestProto {
     required QueryIdProto queryId = 1;
-    required StringProto logicalPlanJson = 2;
+    required KeyValueSetProto queryMeta = 2;
+    required StringProto logicalPlanJson = 3;
 }
 
 message GetTaskRequestProto {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index f031938..ab8d4b4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -133,7 +133,7 @@ public class TajoTestingCluster {
 		String dirStr = getTestDir(randomStr).toString();
 		File dir = new File(dirStr).getAbsoluteFile();
 		// Have it cleaned up on exit
-		//dir.deleteOnExit();
+		dir.deleteOnExit();
 		return dir;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 2efaadc..e61a415 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.Tuple;
@@ -124,7 +125,7 @@ public class TestEvalTree {
       "select name from people where NOT (20 > 30)", // 5
   };
 
-  public static Target[] getRawTargets(String query) {
+  public static Target[] getRawTargets(String query) throws PlanningException {
     Expr expr = analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(expr);
     Target [] targets = plan.getRootBlock().getTargetListManager().getUnEvaluatedTargets();
@@ -136,7 +137,12 @@ public class TestEvalTree {
 
   public static EvalNode getRootSelection(String query) {
     Expr block = analyzer.parse(query);
-    LogicalPlan plan = planner.createPlan(block);
+    LogicalPlan plan = null;
+    try {
+      plan = planner.createPlan(block);
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
     EvalNode qual = plan.getRootBlock().getSelectionNode().getQual();
     assertJsonSerDer(qual);
     return qual;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 1a24ec8..9b974cc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -31,6 +31,7 @@ import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.EvalExprNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
@@ -96,7 +97,12 @@ public class TestEvalTreeUtil {
 
   public static Target [] getRawTargets(String query) {
     Expr expr = analyzer.parse(query);
-    LogicalPlan plan = planner.createPlan(expr);
+    LogicalPlan plan = null;
+    try {
+      plan = planner.createPlan(expr);
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
     if (plan.getRootBlock().getRoot().getType() == NodeType.EXPRS) {
       return ((EvalExprNode)plan.getRootBlock().getRoot()).getExprs();
     } else {
@@ -106,7 +112,12 @@ public class TestEvalTreeUtil {
 
   public static EvalNode getRootSelection(String query) {
     Expr block = analyzer.parse(query);
-    LogicalPlan plan = planner.createPlan(block);
+    LogicalPlan plan = null;
+    try {
+      plan = planner.createPlan(block);
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
     return plan.getRootBlock().getSelectionNode().getQual();
   }
 
@@ -173,7 +184,7 @@ public class TestEvalTreeUtil {
   }
   
   @Test
-  public final void testGetContainExprs() throws CloneNotSupportedException {
+  public final void testGetContainExprs() throws CloneNotSupportedException, PlanningException {
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalPlan plan = planner.createPlan(expr);
     Target [] targets = plan.getRootBlock().getTargetListManager().getUnEvaluatedTargets();
@@ -236,7 +247,7 @@ public class TestEvalTreeUtil {
   }
   
   @Test
-  public final void testSimplify() {
+  public final void testSimplify() throws PlanningException {
     Target [] targets = getRawTargets(QUERIES[0]);
     EvalNode node = AlgebraicUtil.simplify(targets[0].getEvalTree());
     EvalContext nodeCtx = node.newContext();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 5ba26f8..75a531f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -78,18 +78,6 @@ public class TestSQLAnalyzer {
   }
 
   @Test
-  public void testInsert1() throws IOException {
-    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_1.sql"));
-    parseQuery(sql);
-  }
-
-  @Test
-  public void testInsert2() throws IOException {
-    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_2.sql"));
-    parseQuery(sql);
-  }
-
-  @Test
   public void testGroupby1() throws IOException {
     String sql = FileUtil.readTextFile(new File("src/test/queries/groupby_1.sql"));
     parseQuery(sql);
@@ -251,6 +239,42 @@ public class TestSQLAnalyzer {
     parseQuery(sql);
   }
 
+  @Test
+  public void testInsertIntoTable() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_1.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInsertIntoLocation() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_2.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInsertIntoTable2() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_3.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInsertOverwriteIntoTable() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_1.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInsertOverwriteIntoLocation() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_2.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
+  public void testInsertOverwriteIntoTable2() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_3.sql"));
+    parseQuery(sql);
+  }
+
   static String[] exprs = {
       "1 + 2", // 0
       "3 - 4", // 1

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 7c0b9d0..72cbfc2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -242,4 +242,11 @@ public class TestLogicalOptimizer {
     // Test for Join Node
     assertTrue(PlannerUtil.canBeEvaluated(selNode.getQual(), scanNode));
   }
+
+  @Test
+  public final void testInsertInto() throws CloneNotSupportedException, PlanningException {
+    Expr expr = sqlAnalyzer.parse(TestLogicalPlanner.insertStatements[0]);
+    LogicalPlan newPlan = planner.createPlan(expr);
+    optimizer.optimize(newPlan);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 980f435..7f95a85 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -44,8 +44,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestLogicalPlanner {
   private static TajoTestingCluster util;
@@ -62,7 +61,7 @@ public class TestLogicalPlanner {
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
       catalog.registerFunction(funcDesc);
     }
-    
+
     Schema schema = new Schema();
     schema.addColumn("name", Type.TEXT);
     schema.addColumn("empid", Type.INT4);
@@ -98,7 +97,7 @@ public class TestLogicalPlanner {
 
     // TPC-H Schema for Complex Queries
     String [] tpchTables = {
-        "part", "supplier", "partsupp", "nation", "region"
+        "part", "supplier", "partsupp", "nation", "region", "lineitem"
     };
     tpch = new TPCH();
     tpch.loadSchemas();
@@ -136,7 +135,7 @@ public class TestLogicalPlanner {
   };
 
   @Test
-  public final void testSingleRelation() throws CloneNotSupportedException {
+  public final void testSingleRelation() throws CloneNotSupportedException, PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[0]);
     LogicalPlan planNode = planner.createPlan(expr);
     LogicalNode plan = planNode.getRootBlock().getRoot();
@@ -168,7 +167,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testImplicityJoinPlan() throws CloneNotSupportedException {
+  public final void testImplicityJoinPlan() throws CloneNotSupportedException, PlanningException {
     // two relations
     Expr expr = sqlAnalyzer.parse(QUERIES[1]);
     LogicalPlan planNode = planner.createPlan(expr);
@@ -264,7 +263,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testNaturalJoinPlan() {
+  public final void testNaturalJoinPlan() throws PlanningException {
     // two relations
     Expr context = sqlAnalyzer.parse(JOINS[0]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
@@ -300,7 +299,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testInnerJoinPlan() {
+  public final void testInnerJoinPlan() throws PlanningException {
     // two relations
     Expr expr = sqlAnalyzer.parse(JOINS[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -336,7 +335,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testOuterJoinPlan() {
+  public final void testOuterJoinPlan() throws PlanningException {
     // two relations
     Expr expr = sqlAnalyzer.parse(JOINS[2]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -374,7 +373,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testGroupby() throws CloneNotSupportedException {
+  public final void testGroupby() throws CloneNotSupportedException, PlanningException {
     // without 'having clause'
     Expr context = sqlAnalyzer.parse(QUERIES[7]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
@@ -412,7 +411,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testMultipleJoin() throws IOException {
+  public final void testMultipleJoin() throws IOException, PlanningException {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File("src/test/queries/tpch_q2_simplified.tql")));
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -444,7 +443,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testStoreTable() throws CloneNotSupportedException {
+  public final void testStoreTable() throws CloneNotSupportedException, PlanningException {
     Expr context = sqlAnalyzer.parse(QUERIES[8]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
     TestLogicalNode.testCloneLogicalNode(plan);
@@ -460,7 +459,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testOrderBy() throws CloneNotSupportedException {
+  public final void testOrderBy() throws CloneNotSupportedException, PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[4]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -487,7 +486,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testLimit() throws CloneNotSupportedException {
+  public final void testLimit() throws CloneNotSupportedException, PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[12]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -506,7 +505,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testSPJPush() throws CloneNotSupportedException {
+  public final void testSPJPush() throws CloneNotSupportedException, PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[5]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -535,7 +534,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testSPJ() throws CloneNotSupportedException {
+  public final void testSPJ() throws CloneNotSupportedException, PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[6]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -543,7 +542,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testJson() {
+  public final void testJson() throws PlanningException {
 	  Expr expr = sqlAnalyzer.parse(QUERIES[9]);
 	  LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -560,7 +559,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testVisitor() {
+  public final void testVisitor() throws PlanningException {
     // two relations
     Expr expr = sqlAnalyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -585,7 +584,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testExprNode() {
+  public final void testExprNode() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(QUERIES[10]);
     LogicalPlan rootNode = planner.createPlan(expr);
     LogicalNode plan = rootNode.getRootBlock().getRoot();
@@ -611,7 +610,7 @@ public class TestLogicalPlanner {
 
 
   @Test
-  public final void testAlias1() {
+  public final void testAlias1() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(ALIAS[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     LogicalRootNode root = (LogicalRootNode) plan;
@@ -638,7 +637,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testAlias2() {
+  public final void testAlias2() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(ALIAS[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     LogicalRootNode root = (LogicalRootNode) plan;
@@ -657,7 +656,7 @@ public class TestLogicalPlanner {
   };
 
   @Test
-  public final void testCreateTableDef() {
+  public final void testCreateTableDef() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(CREATE_TABLE[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     LogicalRootNode root = (LogicalRootNode) plan;
@@ -741,7 +740,7 @@ public class TestLogicalPlanner {
   };
 
   @Test
-  public final void testCubeBy() {
+  public final void testCubeBy() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(CUBE_ROLLUP[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -778,7 +777,6 @@ public class TestLogicalPlanner {
     }
   }
 
-
   static final String setStatements [] = {
     "select deptName from employee where deptName like 'data%' union select deptName from score where deptName like 'data%'",
     "select deptName from employee union select deptName from score as s1 intersect select deptName from score as s2",
@@ -786,7 +784,7 @@ public class TestLogicalPlanner {
   };
 
   @Test
-  public final void testSetPlan() {
+  public final void testSetPlan() throws PlanningException {
     Expr expr = sqlAnalyzer.parse(setStatements[0]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -803,7 +801,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testSetPlan2() {
+  public final void testSetPlan2() throws PlanningException {
     // for testing multiple set statements
     Expr expr = sqlAnalyzer.parse(setStatements[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -820,7 +818,7 @@ public class TestLogicalPlanner {
   }
 
   @Test
-  public final void testSetPlan3() {
+  public final void testSetPlan3() throws PlanningException {
     // for testing multiple set statements
     Expr expr = sqlAnalyzer.parse(setStatements[2]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -839,8 +837,6 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.PROJECTION, intersect.getRightChild().getType());
   }
 
-
-
   static final String [] setQualifiers = {
     "select name, empid from employee",
     "select distinct name, empid from employee",
@@ -848,7 +844,7 @@ public class TestLogicalPlanner {
   };
 
   @Test
-  public void testSetQualifier() {
+  public void testSetQualifier() throws PlanningException {
     Expr context = sqlAnalyzer.parse(setQualifiers[0]);
     LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
@@ -881,4 +877,93 @@ public class TestLogicalPlanner {
     LogicalNode fromJson = CoreGsonHelper.fromJson(json, LogicalNode.class);
     assertEquals("JSON (de) serialization equivalence check", rootNode, fromJson);
   }
+
+  // Table descriptions
+  //
+  // employee (name text, empid int4, deptname text)
+  // dept (deptname text, nameger text)
+  // score (deptname text, score inet4)
+
+  static final String [] insertStatements = {
+      "insert into score select name from employee",                        // 0
+      "insert into score select name, empid from employee",                 // 1
+      "insert into employee (name, deptname) select * from dept",           // 2
+      "insert into location '/tmp/data' select name, empid from employee",  // 3
+      "insert overwrite into employee (name, deptname) select * from dept", // 4
+      "insert overwrite into LOCATION '/tmp/data' select * from dept"       // 5
+  };
+
+  @Test
+  public final void testInsertInto0() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[0]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertFalse(insertNode.isOverwrite());
+    assertTrue(insertNode.hasTargetTable());
+    assertEquals("score", insertNode.getTargetTable().getName());
+  }
+
+  @Test
+  public final void testInsertInto1() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[1]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertFalse(insertNode.isOverwrite());
+    assertEquals("score", insertNode.getTargetTable().getName());
+  }
+
+  @Test
+  public final void testInsertInto2() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[2]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertFalse(insertNode.isOverwrite());
+    assertEquals("employee", insertNode.getTargetTable().getName());
+    assertTrue(insertNode.hasTargetSchema());
+    assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
+    assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
+  }
+
+  @Test
+  public final void testInsertInto3() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[3]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertFalse(insertNode.isOverwrite());
+    assertTrue(insertNode.hasPath());
+  }
+
+  @Test
+  public final void testInsertInto4() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[4]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertTrue(insertNode.isOverwrite());
+    assertTrue(insertNode.hasTargetTable());
+    assertEquals("employee", insertNode.getTargetTable().getName());
+    assertTrue(insertNode.hasTargetSchema());
+    assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
+    assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
+  }
+
+  @Test
+  public final void testInsertInto5() throws PlanningException {
+    Expr expr = sqlAnalyzer.parse(insertStatements[5]);
+    LogicalPlan plan = planner.createPlan(expr);
+    assertEquals(2, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+    assertTrue(insertNode.isOverwrite());
+    assertTrue(insertNode.hasPath());
+  }
+
+  private static InsertNode getInsertNode(LogicalPlan plan) {
+    LogicalRootNode root = (LogicalRootNode) plan.getRootBlock().getRoot();
+    assertEquals(NodeType.INSERT, root.getChild().getType());
+    return (InsertNode) root.getChild();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 1e95355..4821751 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -96,7 +96,7 @@ public class TestPlannerUtil {
   }
 
   @Test
-  public final void testTransformTwoPhase() {
+  public final void testTransformTwoPhase() throws PlanningException {
     // without 'having clause'
     Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[7]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -111,7 +111,7 @@ public class TestPlannerUtil {
   }
   
   @Test
-  public final void testTrasformTwoPhaseWithStore() {
+  public final void testTrasformTwoPhaseWithStore() throws PlanningException {
     Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[9]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     
@@ -141,7 +141,7 @@ public class TestPlannerUtil {
   }
   
   @Test
-  public final void testFindTopNode() throws CloneNotSupportedException {
+  public final void testFindTopNode() throws CloneNotSupportedException, PlanningException {
     // two relations
     Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[1]);
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index bea1eb6..97459f9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
@@ -129,7 +130,7 @@ public class TestBNLJoinExec {
           "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
 
   @Test
-  public final void testBNLCrossJoin() throws IOException {
+  public final void testBNLCrossJoin() throws IOException, PlanningException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -164,7 +165,7 @@ public class TestBNLJoinExec {
   }
 
   @Test
-  public final void testBNLInnerJoin() throws IOException {
+  public final void testBNLInnerJoin() throws IOException, PlanningException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 621f5a2..0fc3773 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -29,10 +29,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -110,7 +107,7 @@ public class TestExternalSortExec {
   };
 
   @Test
-  public final void testNext() throws IOException {
+  public final void testNext() throws IOException, PlanningException {
     Fragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 9422358..e270df3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -129,7 +130,7 @@ public class TestHashJoinExec {
   };
 
   @Test
-  public final void testHashInnerJoin() throws IOException {
+  public final void testHashInnerJoin() throws IOException, PlanningException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 9d85970..776882b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -33,6 +33,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
@@ -145,7 +146,7 @@ public class TestMergeJoinExec {
   };
 
   @Test
-  public final void testMergeInnerJoin() throws IOException {
+  public final void testMergeInnerJoin() throws IOException, PlanningException {
     Fragment[] empFrags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = sm.splitNG(conf, "people", people.getMeta(), people.getPath(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 7235924..9289dc9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -19,24 +19,27 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
@@ -129,7 +132,7 @@ public class TestNLJoinExec {
   };
   
   @Test
-  public final void testNLCrossJoin() throws IOException {
+  public final void testNLCrossJoin() throws IOException, PlanningException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -156,7 +159,7 @@ public class TestNLJoinExec {
   }
 
   @Test
-  public final void testNLInnerJoin() throws IOException {
+  public final void testNLInnerJoin() throws IOException, PlanningException {
     Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
new file mode 100644
index 0000000..e9a0c65
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestInsertQuery {
+  private static TpchTestBase tpch;
+  public TestInsertQuery() throws IOException {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+  }
+
+  @Test
+  public final void testInsertOverwrite() throws Exception {
+    String tableName ="InsertOverwrite";
+    tpch.execute("create table " + tableName +" (col1 int8, col2 int4, col3 float4)");
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+  }
+
+  @Test
+  public final void testInsertOverwriteSmallerColumns() throws Exception {
+    String tableName = "insertoverwritesmallercolumns";
+    tpch.execute("create table " + tableName + " (col1 int8, col2 int4, col3 float4)");
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+    TableDesc originalDesc = catalog.getTableDesc(tableName);
+
+    tpch.execute("insert overwrite into " + tableName + " select l_orderkey from lineitem");
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+    assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema());
+  }
+
+  @Test
+  public final void testInsertOverwriteWithTargetColumns() throws Exception {
+    String tableName = "InsertOverwriteWithTargetColumns";
+    tpch.execute("create table " + tableName + " (col1 int8, col2 int4, col3 float4)");
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+    TableDesc originalDesc = catalog.getTableDesc(tableName);
+
+    tpch.execute("insert overwrite into " + tableName + " (col1, col3) select l_orderkey, l_quantity from lineitem");
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+
+    ResultSet res = tpch.execute("select * from " + tableName);
+    assertTrue(res.next());
+    assertEquals(1, res.getLong(1));
+    assertEquals("null", res.getString(2));
+    assertTrue(17.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(1, res.getLong(1));
+    assertEquals("null", res.getString(2));
+    assertTrue(36.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(2, res.getLong(1));
+    assertEquals("null", res.getString(2));
+    assertTrue(38.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(3, res.getLong(1));
+    assertEquals("null", res.getString(2));
+    assertTrue(45.0 == res.getFloat(3));
+
+    assertTrue(res.next());
+    assertEquals(3, res.getLong(1));
+    assertEquals("null", res.getString(2));
+    assertTrue(49.0 == res.getFloat(3));
+
+    assertFalse(res.next());
+    res.close();
+
+    assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema());
+  }
+
+  @Test
+  public final void testInsertOverwriteWithAsterisk() throws Exception {
+    String tableName = "testinsertoverwritewithasterisk";
+    tpch.execute("create table " + tableName + " as select * from lineitem");
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3");
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(2, desc.getMeta().getStat().getNumRows().intValue());
+  }
+
+  @Test
+  public final void testInsertOverwriteIntoSelect() throws Exception {
+    String tableName = "insertoverwriteintoselect";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " as select l_orderkey from lineitem");
+    assertFalse(res.next());
+    res.close();
+
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+    TableDesc orderKeys = catalog.getTableDesc(tableName);
+    assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue());
+
+    // this query will result in the two rows.
+    res = tpch.execute(
+        "insert overwrite into " + tableName + " select l_orderkey from lineitem where l_orderkey = 3");
+    assertFalse(res.next());
+    res.close();
+
+    assertTrue(catalog.existsTable(tableName));
+    orderKeys = catalog.getTableDesc(tableName);
+    assertEquals(2, orderKeys.getMeta().getStat().getNumRows().intValue());
+  }
+
+  @Test
+  public final void testInsertOverwriteCapitalTableName() throws Exception {
+    String tableName = "testInsertOverwriteCapitalTableName";
+    tpch.execute("create table " + tableName + " as select * from lineitem");
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3");
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(2, desc.getMeta().getStat().getNumRows().intValue());
+  }
+
+  @Test
+  public final void testInsertOverwriteLocation() throws Exception {
+    tpch.execute("insert overwrite into location '/tajo-data/testInsertOverwriteCapitalTableName' select * from lineitem where l_orderkey = 3");
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName")));
+    assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index e96f7e5..f2a2a71 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.query;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -44,7 +47,7 @@ public class TestSelectQuery {
   public static void setUp() throws Exception {
     tpch = TpchTestBase.getInstance();
   }
-  
+
   @Test
   public final void testSelect() throws Exception {
     ResultSet res = tpch.execute("select l_orderkey, l_partkey from lineitem");
@@ -277,7 +280,7 @@ public class TestSelectQuery {
 
     try {
       Map<Integer, String> result = Maps.newHashMap();
-      result.put(0, "NULL");
+      result.put(0, "null");
       result.put(1, "one");
       result.put(2, "two");
       result.put(3, "three");
@@ -406,15 +409,12 @@ public class TestSelectQuery {
   public final void testCreateAfterSelect() throws Exception {
     ResultSet res = tpch.execute(
         "create table orderkeys as select l_orderkey from lineitem");
-    try {
-      int count = 0;
-      for (;res.next();) {
-        count++;
-      }
-      assertEquals(count, 5);
-    } finally {
-      res.close();
-    }
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable("orderkeys"));
+    TableDesc orderKeys = catalog.getTableDesc("orderkeys");
+    assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue());
   }
 
   //@Test