You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/22 07:10:43 UTC

[1/2] tajo git commit: TAJO-1261: Separate query and ddl execution codes from GlobalEngine.

Repository: tajo
Updated Branches:
  refs/heads/master 341310746 -> a4c348423


http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 9ab4f0a..c96b86e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -37,6 +37,8 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
+import org.apache.tajo.master.exec.prehook.InsertIntoHook;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -390,10 +392,6 @@ public class QueryMasterTask extends CompositeService {
 
       optimizer.optimize(queryContext, plan);
 
-      GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
-      hookManager.addHook(new GlobalEngine.InsertHook());
-      hookManager.doHooks(queryContext, plan);
-
       for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
         LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
         if (scanNodes != null) {


[2/2] tajo git commit: TAJO-1261: Separate query and ddl execution codes from GlobalEngine.

Posted by hy...@apache.org.
TAJO-1261: Separate query and ddl execution codes from GlobalEngine.

Closes #312


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a4c34842
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a4c34842
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a4c34842

Branch: refs/heads/master
Commit: a4c3484232e139d4d21d0cfb9c31e5d784de652b
Parents: 3413107
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Dec 19 20:55:26 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 22 15:07:17 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   7 +-
 .../org/apache/tajo/master/GlobalEngine.java    | 789 +------------------
 .../java/org/apache/tajo/master/TajoMaster.java |   2 +-
 .../tajo/master/TajoMasterClientService.java    |   9 +-
 .../apache/tajo/master/exec/DDLExecutor.java    | 434 ++++++++++
 .../apache/tajo/master/exec/QueryExecutor.java  | 391 +++++++++
 .../master/exec/prehook/CreateTableHook.java    |  53 ++
 .../exec/prehook/DistributedQueryHook.java      |  27 +
 .../prehook/DistributedQueryHookManager.java    |  45 ++
 .../master/exec/prehook/InsertIntoHook.java     |  61 ++
 .../master/querymaster/QueryMasterTask.java     |   6 +-
 11 files changed, 1056 insertions(+), 768 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e30e24c..36cff8a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,8 +24,11 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
-    TAJO-1247: Store type 'TEXTFILE' should be TEXT while keeping enum 'TEXTFILE' in protobuf.
-    (DaeMyung Kang via hyunsik)
+    TAJO-1261: Separate query and ddl execution codes from GlobalEngine. 
+    (hyunsik)
+
+    TAJO-1247: Store type 'TEXTFILE' should be TEXT while keeping enum 
+    'TEXTFILE' in protobuf. (DaeMyung Kang via hyunsik)
 
     TAJO-1221: HA TajoClient should not connect TajoMaster at the first. 
     (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 71b1f9b..d7e7670 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -19,66 +19,42 @@
 package org.apache.tajo.master;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
-import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryInfo;
-import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.exec.DDLExecutor;
+import org.apache.tajo.master.exec.QueryExecutor;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.plan.*;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.ProtoUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimeZone;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 
 public class GlobalEngine extends AbstractService {
@@ -96,11 +72,17 @@ public class GlobalEngine extends AbstractService {
   private LogicalPlanVerifier annotatedPlanVerifier;
   private DistributedQueryHookManager hookManager;
 
+  private QueryExecutor queryExecutor;
+  private DDLExecutor ddlExecutor;
+
   public GlobalEngine(final MasterContext context) {
     super(GlobalEngine.class.getName());
     this.context = context;
     this.catalog = context.getCatalog();
     this.sm = context.getStorageManager();
+
+    this.ddlExecutor = new DDLExecutor(context);
+    this.queryExecutor = new QueryExecutor(context, ddlExecutor);
   }
 
   public void start() {
@@ -110,10 +92,6 @@ public class GlobalEngine extends AbstractService {
       planner = new LogicalPlanner(context.getCatalog());
       optimizer = new LogicalOptimizer(context.getConf());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
-
-      hookManager = new DistributedQueryHookManager();
-      hookManager.addHook(new CreateTableHook());
-      hookManager.addHook(new InsertHook());
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
     }
@@ -148,6 +126,14 @@ public class GlobalEngine extends AbstractService {
     return annotatedPlanVerifier;
   }
 
+  public DDLExecutor getDDLExecutor() {
+    return ddlExecutor;
+  }
+
+  public QueryExecutor getQueryExecutor() {
+    return queryExecutor;
+  }
+
   private QueryContext createQueryContext(Session session) {
     QueryContext newQueryContext =  new QueryContext(context.getConf(), session);
 
@@ -168,12 +154,12 @@ public class GlobalEngine extends AbstractService {
       if (isJson) {
         planningContext = buildExpressionFromJson(query);
       } else {
-        planningContext = buildExpressionFromSql(queryContext, query);
+        planningContext = buildExpressionFromSql(query);
       }
 
       String jsonExpr = planningContext.toJson();
       LogicalPlan plan = createLogicalPlan(queryContext, planningContext);
-      SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, query, jsonExpr);
+      SubmitQueryResponse response = queryExecutor.execute(queryContext, session, query, jsonExpr, plan);
       return response;
     } catch (Throwable t) {
       context.getSystemMetrics().counter("Query", "errorQuery").inc();
@@ -197,288 +183,14 @@ public class GlobalEngine extends AbstractService {
     return JsonHelper.fromJson(json, Expr.class);
   }
 
-  public Expr buildExpressionFromSql(QueryContext queryContext, String sql)
-      throws InterruptedException, IOException, IllegalQueryStatusException {
+  public Expr buildExpressionFromSql(String sql) throws InterruptedException, IOException,
+      IllegalQueryStatusException {
     context.getSystemMetrics().counter("Query", "totalQuery").inc();
     return analyzer.parse(sql);
   }
 
-  private SubmitQueryResponse executeQueryInternal(QueryContext queryContext,
-                                                   Session session,
-                                                   LogicalPlan plan,
-                                                   String sql,
-                                                   String jsonExpr) throws Exception {
-
-    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-
-    SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
-    responseBuilder.setIsForwarded(false);
-    responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
-
-    if (PlannerUtil.checkIfSetSession(rootNode)) {
-
-      SetSessionNode setSessionNode = rootNode.getChild();
-
-      final String varName = setSessionNode.getName();
-
-      // SET CATALOG 'XXX'
-      if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
-        String databaseName = setSessionNode.getValue();
-
-        if (catalog.existDatabase(databaseName)) {
-          session.selectDatabase(setSessionNode.getValue());
-        } else {
-          responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-          responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
-          responseBuilder.setErrorMessage("database \"" + databaseName + "\" does not exists.");
-          return responseBuilder.build();
-        }
-
-        // others
-      } else {
-        if (setSessionNode.isDefaultValue()) {
-          session.removeVariable(varName);
-        } else {
-          session.setVariable(varName, setSessionNode.getValue());
-        }
-      }
-
-      context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
-    } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
-      context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
-      updateQuery(queryContext, rootNode.getChild());
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
-    } else if (plan.isExplain()) { // explain query
-      String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
-      Schema schema = new Schema();
-      schema.addColumn("explain", TajoDataTypes.Type.TEXT);
-      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-
-      SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
-
-      VTuple tuple = new VTuple(1);
-      String[] lines = explainStr.split("\n");
-      int bytesNum = 0;
-      for (String line : lines) {
-        tuple.put(0, DatumFactory.createText(line));
-        byte [] encodedData = encoder.toBytes(tuple);
-        bytesNum += encodedData.length;
-        serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
-      }
-      serializedResBuilder.setSchema(schema.getProto());
-      serializedResBuilder.setBytesNum(bytesNum);
-
-      responseBuilder.setResultSet(serializedResBuilder.build());
-      responseBuilder.setMaxRowNum(lines.length);
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-
-      // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
-    } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
-      ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
-      if (scanNode == null) {
-        scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
-      }
-      TableDesc desc = scanNode.getTableDesc();
-      int maxRow = Integer.MAX_VALUE;
-      if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
-        LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
-        maxRow = (int) limitNode.getFetchFirstNum();
-      }
-      if (desc.getStats().getNumRows() == 0) {
-        desc.getStats().setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
-      }
-
-      QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
-
-      NonForwardQueryResultScanner queryResultScanner =
-          new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
-
-      queryResultScanner.init();
-      session.addNonForwardQueryResultScanner(queryResultScanner);
-
-      responseBuilder.setQueryId(queryId.getProto());
-      responseBuilder.setMaxRowNum(maxRow);
-      responseBuilder.setTableDesc(desc.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
-      // NonFromQuery indicates a form of 'select a, x+y;'
-    } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
-      Target[] targets = plan.getRootBlock().getRawTargets();
-      if (targets == null) {
-        throw new PlanningException("No targets");
-      }
-      final Tuple outTuple = new VTuple(targets.length);
-      for (int i = 0; i < targets.length; i++) {
-        EvalNode eval = targets[i].getEvalTree();
-        outTuple.put(i, eval.eval(null, null));
-      }
-      boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
-      if (isInsert) {
-        InsertNode insertNode = rootNode.getChild();
-        insertNonFromQuery(queryContext, insertNode, responseBuilder);
-      } else {
-        Schema schema = PlannerUtil.targetToSchema(targets);
-        RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-        byte[] serializedBytes = encoder.toBytes(outTuple);
-        SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
-        serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
-        serializedResBuilder.setSchema(schema.getProto());
-        serializedResBuilder.setBytesNum(serializedBytes.length);
-
-        responseBuilder.setResultSet(serializedResBuilder);
-        responseBuilder.setMaxRowNum(1);
-        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-      }
-    } else { // it requires distributed execution. So, the query is forwarded to a query master.
-      StoreType storeType = PlannerUtil.getStoreType(plan);
-      if (storeType != null) {
-        StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
-        StorageProperty storageProperty = sm.getStorageProperty();
-        if (!storageProperty.isSupportsInsertInto()) {
-          throw new VerifyException("Inserting into non-file storage is not supported.");
-        }
-        sm.beforeInsertOrCATS(rootNode.getChild());
-      }
-      context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
-      hookManager.doHooks(queryContext, plan);
-
-      QueryJobManager queryJobManager = this.context.getQueryJobManager();
-      QueryInfo queryInfo;
-
-      queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
-
-      if(queryInfo == null) {
-        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-        responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
-        responseBuilder.setErrorMessage("Fail starting QueryMaster.");
-        LOG.error("Fail starting QueryMaster: " + sql);
-      } else {
-        responseBuilder.setIsForwarded(true);
-        responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
-        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-        if(queryInfo.getQueryMasterHost() != null) {
-          responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-        }
-        responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
-        LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
-            " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
-      }
-    }
-
-    responseBuilder.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));
-    SubmitQueryResponse response = responseBuilder.build();
-    return response;
-  }
-
-  private void insertNonFromQuery(QueryContext queryContext, InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
-      throws Exception {
-    String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
-    String queryId = nodeUniqName + "_" + System.currentTimeMillis();
-
-    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
-    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-
-    TableDesc tableDesc = null;
-    Path finalOutputDir = null;
-    if (insertNode.getTableName() != null) {
-      tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
-      finalOutputDir = new Path(tableDesc.getPath());
-    } else {
-      finalOutputDir = insertNode.getPath();
-    }
-
-    TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
-    taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
-
-    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
-    StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
-    try {
-      exec.init();
-      exec.next();
-    } finally {
-      exec.close();
-    }
-
-    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
-      // it moves the original table into the temporary location.
-      // Then it moves the new result table into the original table location.
-      // Upon failed, it recovers the original table if possible.
-      boolean movedToOldTable = false;
-      boolean committed = false;
-      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-      try {
-        if (fs.exists(finalOutputDir)) {
-          fs.rename(finalOutputDir, oldTableDir);
-          movedToOldTable = fs.exists(oldTableDir);
-        } else { // if the parent does not exist, make its parent directory.
-          fs.mkdirs(finalOutputDir.getParent());
-        }
-        fs.rename(stagingResultDir, finalOutputDir);
-        committed = fs.exists(finalOutputDir);
-      } catch (IOException ioe) {
-        // recover the old table
-        if (movedToOldTable && !committed) {
-          fs.rename(oldTableDir, finalOutputDir);
-        }
-      }
-    } else {
-      FileStatus[] files = fs.listStatus(stagingResultDir);
-      for (FileStatus eachFile: files) {
-        Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
-        if (fs.exists(targetFilePath)) {
-          targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
-        }
-        fs.rename(eachFile.getPath(), targetFilePath);
-      }
-    }
-
-    if (insertNode.hasTargetTable()) {
-      TableStats stats = tableDesc.getStats();
-      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-      stats.setNumBytes(volume);
-      stats.setNumRows(1);
-
-      UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
-      builder.setTableName(tableDesc.getName());
-      builder.setStats(stats.getProto());
-
-      catalog.updateTableStats(builder.build());
-
-      responseBuilder.setTableDesc(tableDesc.getProto());
-    } else {
-      TableStats stats = new TableStats();
-      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-      stats.setNumBytes(volume);
-      stats.setNumRows(1);
-
-      // Empty TableDesc
-      List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
-      CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
-          .setTableName(nodeUniqName)
-          .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
-          .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
-          .setStats(stats.getProto())
-          .build();
-
-      responseBuilder.setTableDesc(tableDescProto);
-    }
-
-    // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
-    responseBuilder.setMaxRowNum(-1);
-    responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-    responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-  }
-
-
-  public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
+  public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException,
+      SQLException, PlanningException {
     try {
       LOG.info("SQL: " + sql);
 
@@ -496,7 +208,7 @@ public class GlobalEngine extends AbstractService {
       if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
         throw new SQLException("This is not update query:\n" + sql);
       } else {
-        updateQuery(queryContext, rootNode.getChild());
+        ddlExecutor.execute(queryContext, plan);
         return QueryIdFactory.NULL_QUERY_ID;
       }
     } catch (Exception e) {
@@ -505,44 +217,6 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  private boolean updateQuery(QueryContext queryContext, LogicalNode root) throws IOException {
-
-    switch (root.getType()) {
-      case SET_SESSION:
-
-      case CREATE_DATABASE:
-        CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
-        createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
-        return true;
-      case DROP_DATABASE:
-        DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
-        dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
-        return true;
-      case CREATE_TABLE:
-        CreateTableNode createTable = (CreateTableNode) root;
-        createTable(queryContext, createTable, createTable.isIfNotExists());
-        return true;
-      case DROP_TABLE:
-        DropTableNode dropTable = (DropTableNode) root;
-        dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
-        return true;
-      case ALTER_TABLESPACE:
-        AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
-        alterTablespace(queryContext, alterTablespace);
-        return true;
-      case ALTER_TABLE:
-        AlterTableNode alterTable = (AlterTableNode) root;
-        alterTable(queryContext,alterTable);
-        return true;
-      case TRUNCATE_TABLE:
-        TruncateTableNode truncateTable = (TruncateTableNode) root;
-        truncateTable(queryContext, truncateTable);
-        return true;
-      default:
-        throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
-    }
-  }
-
   private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws PlanningException {
 
     VerificationState state = new VerificationState();
@@ -599,403 +273,4 @@ public class GlobalEngine extends AbstractService {
       }
     }
   }
-
-  /**
-   * Alter a given table
-   */
-  public void alterTablespace(final QueryContext queryContext, final AlterTablespaceNode alterTablespace) {
-
-    final CatalogService catalog = context.getCatalog();
-    final String spaceName = alterTablespace.getTablespaceName();
-
-    AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
-    builder.setSpaceName(spaceName);
-    if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
-      AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
-          AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
-      commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
-      commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
-      commandBuilder.build();
-      builder.addCommand(commandBuilder);
-    } else {
-      throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
-    }
-
-    catalog.alterTablespace(builder.build());
-  }
-
-  /**
-   * Alter a given table
-   */
-  public void alterTable(final QueryContext queryContext, final AlterTableNode alterTable) throws IOException {
-
-    final CatalogService catalog = context.getCatalog();
-    final String tableName = alterTable.getTableName();
-
-    String databaseName;
-    String simpleTableName;
-    if (CatalogUtil.isFQTableName(tableName)) {
-      String[] split = CatalogUtil.splitFQTableName(tableName);
-      databaseName = split[0];
-      simpleTableName = split[1];
-    } else {
-      databaseName = queryContext.getCurrentDatabase();
-      simpleTableName = tableName;
-    }
-    final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
-    if (!catalog.existsTable(databaseName, simpleTableName)) {
-      throw new NoSuchTableException(qualifiedName);
-    }
-
-    switch (alterTable.getAlterTableOpType()) {
-      case RENAME_TABLE:
-        if (!catalog.existsTable(databaseName, simpleTableName)) {
-          throw new NoSuchTableException(alterTable.getTableName());
-        }
-        if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
-          throw new AlreadyExistsTableException(alterTable.getNewTableName());
-        }
-
-        TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
-
-        if (!desc.isExternal()) { // if the table is the managed table
-          Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
-              databaseName, simpleTableName);
-          Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
-              databaseName, alterTable.getNewTableName());
-          FileSystem fs = oldPath.getFileSystem(context.getConf());
-
-          if (!fs.exists(oldPath)) {
-            throw new IOException("No such a table directory: " + oldPath);
-          }
-          if (fs.exists(newPath)) {
-            throw new IOException("Already table directory exists: " + newPath);
-          }
-
-          fs.rename(oldPath, newPath);
-        }
-        catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
-            AlterTableType.RENAME_TABLE));
-        break;
-      case RENAME_COLUMN:
-        if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
-          throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
-        }
-        catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(),
-            alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
-        break;
-      case ADD_COLUMN:
-        if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
-          throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
-        }
-        catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
-        break;
-      default:
-        //TODO
-    }
-  }
-
-  /**
-   * Truncate table a given table
-   */
-  public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode)
-      throws IOException {
-    List<String> tableNames = truncateTableNode.getTableNames();
-    final CatalogService catalog = context.getCatalog();
-
-    String databaseName;
-    String simpleTableName;
-
-    List<TableDesc> tableDescList = new ArrayList<TableDesc>();
-    for (String eachTableName: tableNames) {
-      if (CatalogUtil.isFQTableName(eachTableName)) {
-        String[] split = CatalogUtil.splitFQTableName(eachTableName);
-        databaseName = split[0];
-        simpleTableName = split[1];
-      } else {
-        databaseName = queryContext.getCurrentDatabase();
-        simpleTableName = eachTableName;
-      }
-      final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
-      if (!catalog.existsTable(databaseName, simpleTableName)) {
-        throw new NoSuchTableException(qualifiedName);
-      }
-
-      Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
-      TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
-      Path tablePath = new Path(tableDesc.getPath());
-      if (tablePath.getParent() == null ||
-          !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
-        throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
-            ", warehouse dir=" + warehousePath);
-      }
-      tableDescList.add(tableDesc);
-    }
-
-    for (TableDesc eachTable: tableDescList) {
-      Path path = new Path(eachTable.getPath());
-      LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
-      FileSystem fs = path.getFileSystem(context.getConf());
-
-      FileStatus[] files = fs.listStatus(path);
-      if (files != null) {
-        for (FileStatus eachFile: files) {
-          fs.delete(eachFile.getPath(), true);
-        }
-      }
-    }
-  }
-
-  private boolean existColumnName(String tableName, String columnName) {
-    final TableDesc tableDesc = catalog.getTableDesc(tableName);
-    return tableDesc.getSchema().containsByName(columnName) ? true : false;
-  }
-
-  private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists) throws IOException {
-    TableMeta meta;
-
-    if (createTable.hasOptions()) {
-      meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
-    } else {
-      meta = CatalogUtil.newTableMeta(createTable.getStorageType());
-    }
-
-    if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
-      Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
-    }
-
-    return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
-        createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
-        createTable.getPartitionMethod(), ifNotExists);
-  }
-
-  public TableDesc createTable(QueryContext queryContext, String tableName, StoreType storeType,
-                               Schema schema, TableMeta meta, Path path, boolean isExternal,
-                               PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
-    String databaseName;
-    String simpleTableName;
-    if (CatalogUtil.isFQTableName(tableName)) {
-      String [] splitted = CatalogUtil.splitFQTableName(tableName);
-      databaseName = splitted[0];
-      simpleTableName = splitted[1];
-    } else {
-      databaseName = queryContext.getCurrentDatabase();
-      simpleTableName = tableName;
-    }
-    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
-    boolean exists = catalog.existsTable(databaseName, simpleTableName);
-
-    if (exists) {
-      if (ifNotExists) {
-        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
-        return catalog.getTableDesc(databaseName, simpleTableName);
-      } else {
-        throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
-      }
-    }
-
-    TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
-        schema, meta, (path != null ? path.toUri(): null), isExternal);
-    
-    if (partitionDesc != null) {
-      desc.setPartitionMethod(partitionDesc);
-    }
-
-    StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
-
-    if (catalog.createTable(desc)) {
-      LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
-      return desc;
-    } else {
-      LOG.info("Table creation " + tableName + " is failed.");
-      throw new CatalogException("Cannot create table \"" + tableName + "\".");
-    }
-  }
-
-  public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName,
-                                @Nullable String tablespace,
-                                boolean ifNotExists) throws IOException {
-
-    String tablespaceName;
-    if (tablespace == null) {
-      tablespaceName = DEFAULT_TABLESPACE_NAME;
-    } else {
-      tablespaceName = tablespace;
-    }
-
-    // CREATE DATABASE IF NOT EXISTS
-    boolean exists = catalog.existDatabase(databaseName);
-    if (exists) {
-      if (ifNotExists) {
-        LOG.info("database \"" + databaseName + "\" is already exists." );
-        return true;
-      } else {
-        throw new AlreadyExistsDatabaseException(databaseName);
-      }
-    }
-
-    if (catalog.createDatabase(databaseName, tablespaceName)) {
-      String normalized = databaseName;
-      Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
-      FileSystem fs = databaseDir.getFileSystem(context.getConf());
-      fs.mkdirs(databaseDir);
-    }
-
-    return true;
-  }
-
-  public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) {
-
-    boolean exists = catalog.existDatabase(databaseName);
-    if(!exists) {
-      if (ifExists) { // DROP DATABASE IF EXISTS
-        LOG.info("database \"" + databaseName + "\" does not exists." );
-        return true;
-      } else { // Otherwise, it causes an exception.
-        throw new NoSuchDatabaseException(databaseName);
-      }
-    }
-
-    if (queryContext.getCurrentDatabase().equals(databaseName)) {
-      throw new RuntimeException("ERROR: Cannot drop the current open database");
-    }
-
-    boolean result = catalog.dropDatabase(databaseName);
-    LOG.info("database " + databaseName + " is dropped.");
-    return result;
-  }
-
-  /**
-   * Drop a given named table
-   *
-   * @param tableName to be dropped
-   * @param purge Remove all data if purge is true.
-   */
-  public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) {
-    CatalogService catalog = context.getCatalog();
-
-    String databaseName;
-    String simpleTableName;
-    if (CatalogUtil.isFQTableName(tableName)) {
-      String [] splitted = CatalogUtil.splitFQTableName(tableName);
-      databaseName = splitted[0];
-      simpleTableName = splitted[1];
-    } else {
-      databaseName = queryContext.getCurrentDatabase();
-      simpleTableName = tableName;
-    }
-    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
-    boolean exists = catalog.existsTable(qualifiedName);
-    if(!exists) {
-      if (ifExists) { // DROP TABLE IF EXISTS
-        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
-        return true;
-      } else { // Otherwise, it causes an exception.
-        throw new NoSuchTableException(qualifiedName);
-      }
-    }
-
-    TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
-    catalog.dropTable(qualifiedName);
-
-    if (purge) {
-      try {
-        StorageManager.getStorageManager(queryContext.getConf(),
-            tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
-      } catch (IOException e) {
-        throw new InternalError(e.getMessage());
-      }
-    }
-
-    LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
-    return true;
-  }
-
-  public interface DistributedQueryHook {
-    boolean isEligible(QueryContext queryContext, LogicalPlan plan);
-    void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
-  }
-
-  public static class DistributedQueryHookManager {
-    private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
-    public void addHook(DistributedQueryHook hook) {
-      hooks.add(hook);
-    }
-
-    public void doHooks(QueryContext queryContext, LogicalPlan plan) {
-      for (DistributedQueryHook hook : hooks) {
-        if (hook.isEligible(queryContext, plan)) {
-          try {
-            hook.hook(queryContext, plan);
-          } catch (Throwable t) {
-            t.printStackTrace();
-          }
-        }
-      }
-    }
-  }
-
-  public class CreateTableHook implements DistributedQueryHook {
-
-    @Override
-    public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
-      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-      return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
-    }
-
-    @Override
-    public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
-      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-      CreateTableNode createTableNode = rootNode.getChild();
-      String [] splitted  = CatalogUtil.splitFQTableName(createTableNode.getTableName());
-      String databaseName = splitted[0];
-      String tableName = splitted[1];
-      queryContext.setOutputTable(tableName);
-      queryContext.setOutputPath(
-          StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName));
-      if(createTableNode.getPartitionMethod() != null) {
-        queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
-      }
-      queryContext.setCreateTable();
-    }
-  }
-
-  public static class InsertHook implements DistributedQueryHook {
-
-    @Override
-    public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
-      return plan.getRootBlock().getRootType() == NodeType.INSERT;
-    }
-
-    @Override
-  public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
-      queryContext.setInsert();
-
-      InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
-
-      // Set QueryContext settings, such as output table name and output path.
-      // It also remove data files if overwrite is true.
-      Path outputPath;
-      if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
-        queryContext.setOutputTable(insertNode.getTableName());
-        queryContext.setOutputPath(insertNode.getPath());
-        if (insertNode.hasPartition()) {
-          queryContext.setPartitionMethod(insertNode.getPartitionMethod());
-        }
-      } else { // INSERT INTO LOCATION ...
-        // When INSERT INTO LOCATION, must not set output table.
-        outputPath = insertNode.getPath();
-        queryContext.setFileOutput();
-        queryContext.setOutputPath(outputPath);
-      }
-
-      if (insertNode.isOverwrite()) {
-        queryContext.setOutputOverwrite();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index f307127..d021e43 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -371,7 +371,7 @@ public class TajoMaster extends CompositeService {
     }
 
     if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
-      globalEngine.createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
+      globalEngine.getDDLExecutor().createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
     } else {
       LOG.info(String.format("Default database (%s) is already prepared.", DEFAULT_DATABASE_NAME));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7014034..ee99353 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -658,7 +658,7 @@ public class TajoMasterClientService extends AbstractService {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
         QueryContext queryContext = new QueryContext(conf, session);
 
-        if (context.getGlobalEngine().createDatabase(queryContext, request.getValue(), null, false)) {
+        if (context.getGlobalEngine().getDDLExecutor().createDatabase(queryContext, request.getValue(), null, false)) {
           return BOOL_TRUE;
         } else {
           return BOOL_FALSE;
@@ -688,7 +688,7 @@ public class TajoMasterClientService extends AbstractService {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
         QueryContext queryContext = new QueryContext(conf, session);
 
-        if (context.getGlobalEngine().dropDatabase(queryContext, request.getValue(), false)) {
+        if (context.getGlobalEngine().getDDLExecutor().dropDatabase(queryContext, request.getValue(), false)) {
           return BOOL_TRUE;
         } else {
           return BOOL_FALSE;
@@ -814,7 +814,7 @@ public class TajoMasterClientService extends AbstractService {
 
         TableDesc desc;
         try {
-          desc = context.getGlobalEngine().createTable(queryContext, request.getName(),
+          desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
               meta.getStoreType(), schema,
               meta, path, true, partitionDesc, false);
         } catch (Exception e) {
@@ -843,7 +843,8 @@ public class TajoMasterClientService extends AbstractService {
         Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
         QueryContext queryContext = new QueryContext(conf, session);
 
-        context.getGlobalEngine().dropTable(queryContext, dropTable.getName(), false, dropTable.getPurge());
+        context.getGlobalEngine().getDDLExecutor().dropTable(queryContext, dropTable.getName(), false,
+            dropTable.getPurge());
         return BOOL_TRUE;
       } catch (Throwable t) {
         throw new ServiceException(t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
new file mode 100644
index 0000000..acbaa01
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.AlterTablespaceSetType;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+/**
+ * Executor for DDL statements. They are executed on only TajoMaster.
+ */
+public class DDLExecutor {
+  private static final Log LOG = LogFactory.getLog(DDLExecutor.class);
+
+  private final TajoMaster.MasterContext context;
+  private final CatalogService catalog;
+  private final StorageManager storageManager;
+
+  public DDLExecutor(TajoMaster.MasterContext context) {
+    this.context = context;
+    this.catalog = context.getCatalog();
+    this.storageManager = context.getStorageManager();
+  }
+
+  public boolean execute(QueryContext queryContext, LogicalPlan plan) throws IOException {
+    LogicalNode root = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();
+
+    switch (root.getType()) {
+
+    case ALTER_TABLESPACE:
+      AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+      alterTablespace(context, queryContext, alterTablespace);
+      return true;
+
+
+    case CREATE_DATABASE:
+      CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+      createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+      return true;
+    case DROP_DATABASE:
+      DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+      dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+      return true;
+
+
+    case CREATE_TABLE:
+      CreateTableNode createTable = (CreateTableNode) root;
+      createTable(queryContext, createTable, createTable.isIfNotExists());
+      return true;
+    case DROP_TABLE:
+      DropTableNode dropTable = (DropTableNode) root;
+      dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+      return true;
+    case TRUNCATE_TABLE:
+      TruncateTableNode truncateTable = (TruncateTableNode) root;
+      truncateTable(queryContext, truncateTable);
+      return true;
+
+    case ALTER_TABLE:
+      AlterTableNode alterTable = (AlterTableNode) root;
+      alterTable(context, queryContext, alterTable);
+      return true;
+
+    default:
+      throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
+    }
+  }
+
+  /**
+   * Alter a given table
+   */
+  public static void alterTablespace(final TajoMaster.MasterContext context, final QueryContext queryContext,
+                                     final AlterTablespaceNode alterTablespace) {
+
+    final CatalogService catalog = context.getCatalog();
+    final String spaceName = alterTablespace.getTablespaceName();
+
+    AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
+    builder.setSpaceName(spaceName);
+    if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
+      AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
+          AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
+      commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
+      commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
+      commandBuilder.build();
+      builder.addCommand(commandBuilder);
+    } else {
+      throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
+    }
+
+    catalog.alterTablespace(builder.build());
+  }
+
+  //--------------------------------------------------------------------------
+
+  // Database Section
+  //--------------------------------------------------------------------------
+  public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName,
+                                @Nullable String tablespace,
+                                boolean ifNotExists) throws IOException {
+
+    String tablespaceName;
+    if (tablespace == null) {
+      tablespaceName = DEFAULT_TABLESPACE_NAME;
+    } else {
+      tablespaceName = tablespace;
+    }
+
+    // CREATE DATABASE IF NOT EXISTS
+    boolean exists = catalog.existDatabase(databaseName);
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("database \"" + databaseName + "\" is already exists." );
+        return true;
+      } else {
+        throw new AlreadyExistsDatabaseException(databaseName);
+      }
+    }
+
+    if (catalog.createDatabase(databaseName, tablespaceName)) {
+      String normalized = databaseName;
+      Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
+      FileSystem fs = databaseDir.getFileSystem(context.getConf());
+      fs.mkdirs(databaseDir);
+    }
+
+    return true;
+  }
+
+  public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) {
+    boolean exists = catalog.existDatabase(databaseName);
+    if(!exists) {
+      if (ifExists) { // DROP DATABASE IF EXISTS
+        LOG.info("database \"" + databaseName + "\" does not exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchDatabaseException(databaseName);
+      }
+    }
+
+    if (queryContext.getCurrentDatabase().equals(databaseName)) {
+      throw new RuntimeException("ERROR: Cannot drop the current open database");
+    }
+
+    boolean result = catalog.dropDatabase(databaseName);
+    LOG.info("database " + databaseName + " is dropped.");
+    return result;
+  }
+
+  //--------------------------------------------------------------------------
+
+  // Table Section
+  //--------------------------------------------------------------------------
+  private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists)
+      throws IOException {
+    TableMeta meta;
+
+    if (createTable.hasOptions()) {
+      meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
+    } else {
+      meta = CatalogUtil.newTableMeta(createTable.getStorageType());
+    }
+
+    if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
+      Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+    }
+
+    return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
+        createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
+        createTable.getPartitionMethod(), ifNotExists);
+  }
+
+  public TableDesc createTable(QueryContext queryContext, String tableName, CatalogProtos.StoreType storeType,
+                               Schema schema, TableMeta meta, Path path, boolean isExternal,
+                               PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    boolean exists = catalog.existsTable(databaseName, simpleTableName);
+
+    if (exists) {
+      if (ifNotExists) {
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return catalog.getTableDesc(databaseName, simpleTableName);
+      } else {
+        throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+      }
+    }
+
+    TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
+        schema, meta, (path != null ? path.toUri(): null), isExternal);
+
+    if (partitionDesc != null) {
+      desc.setPartitionMethod(partitionDesc);
+    }
+
+    StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+
+    if (catalog.createTable(desc)) {
+      LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+      return desc;
+    } else {
+      LOG.info("Table creation " + tableName + " is failed.");
+      throw new CatalogException("Cannot create table \"" + tableName + "\".");
+    }
+  }
+
+  /**
+   * Drop a given named table
+   *
+   * @param tableName to be dropped
+   * @param purge     Remove all data if purge is true.
+   */
+  public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) {
+    CatalogService catalog = context.getCatalog();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableName);
+      databaseName = splitted[0];
+      simpleTableName = splitted[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    boolean exists = catalog.existsTable(qualifiedName);
+    if(!exists) {
+      if (ifExists) { // DROP TABLE IF EXISTS
+        LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+        return true;
+      } else { // Otherwise, it causes an exception.
+        throw new NoSuchTableException(qualifiedName);
+      }
+    }
+
+    TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
+    catalog.dropTable(qualifiedName);
+
+    if (purge) {
+      try {
+        StorageManager.getStorageManager(queryContext.getConf(),
+            tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
+      } catch (IOException e) {
+        throw new InternalError(e.getMessage());
+      }
+    }
+
+    LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
+    return true;
+  }
+
+  /**
+   * Truncate table a given table
+   */
+  public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode)
+      throws IOException {
+    List<String> tableNames = truncateTableNode.getTableNames();
+    final CatalogService catalog = context.getCatalog();
+
+    String databaseName;
+    String simpleTableName;
+
+    List<TableDesc> tableDescList = new ArrayList<TableDesc>();
+    for (String eachTableName: tableNames) {
+      if (CatalogUtil.isFQTableName(eachTableName)) {
+        String[] split = CatalogUtil.splitFQTableName(eachTableName);
+        databaseName = split[0];
+        simpleTableName = split[1];
+      } else {
+        databaseName = queryContext.getCurrentDatabase();
+        simpleTableName = eachTableName;
+      }
+      final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+      if (!catalog.existsTable(databaseName, simpleTableName)) {
+        throw new NoSuchTableException(qualifiedName);
+      }
+
+      Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
+      TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
+      Path tablePath = new Path(tableDesc.getPath());
+      if (tablePath.getParent() == null ||
+          !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
+        throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
+            ", warehouse dir=" + warehousePath);
+      }
+      tableDescList.add(tableDesc);
+    }
+
+    for (TableDesc eachTable: tableDescList) {
+      Path path = new Path(eachTable.getPath());
+      LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
+      FileSystem fs = path.getFileSystem(context.getConf());
+
+      FileStatus[] files = fs.listStatus(path);
+      if (files != null) {
+        for (FileStatus eachFile: files) {
+          fs.delete(eachFile.getPath(), true);
+        }
+      }
+    }
+  }
+
+  /**
+   * ALTER TABLE SET ...
+   */
+  public void alterTable(TajoMaster.MasterContext context, final QueryContext queryContext,
+                         final AlterTableNode alterTable) throws IOException {
+
+    final CatalogService catalog = context.getCatalog();
+    final String tableName = alterTable.getTableName();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String[] split = CatalogUtil.splitFQTableName(tableName);
+      databaseName = split[0];
+      simpleTableName = split[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+    final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+    if (!catalog.existsTable(databaseName, simpleTableName)) {
+      throw new NoSuchTableException(qualifiedName);
+    }
+
+    switch (alterTable.getAlterTableOpType()) {
+    case RENAME_TABLE:
+      if (!catalog.existsTable(databaseName, simpleTableName)) {
+        throw new NoSuchTableException(alterTable.getTableName());
+      }
+      if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
+        throw new AlreadyExistsTableException(alterTable.getNewTableName());
+      }
+
+      TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
+
+      if (!desc.isExternal()) { // if the table is the managed table
+        Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+            databaseName, simpleTableName);
+        Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+            databaseName, alterTable.getNewTableName());
+        FileSystem fs = oldPath.getFileSystem(context.getConf());
+
+        if (!fs.exists(oldPath)) {
+          throw new IOException("No such a table directory: " + oldPath);
+        }
+        if (fs.exists(newPath)) {
+          throw new IOException("Already table directory exists: " + newPath);
+        }
+
+        fs.rename(oldPath, newPath);
+      }
+      catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
+          AlterTableType.RENAME_TABLE));
+      break;
+    case RENAME_COLUMN:
+      if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
+        throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
+      }
+      catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(),
+          alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
+      break;
+    case ADD_COLUMN:
+      if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
+        throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
+      }
+      catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
+      break;
+    default:
+      //TODO
+    }
+  }
+
+  private boolean existColumnName(String tableName, String columnName) {
+    final TableDesc tableDesc = catalog.getTableDesc(tableName);
+    return tableDesc.getSchema().containsByName(columnName) ? true : false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
new file mode 100644
index 0000000..3585ae7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.StoreTableExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.master.NonForwardQueryResultScanner;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.exec.prehook.CreateTableHook;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
+import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+import org.apache.tajo.master.querymaster.*;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryExecutor {
+  private static final Log LOG = LogFactory.getLog(QueryExecutor.class);
+
+  private final TajoMaster.MasterContext context;
+  private final CatalogService catalog;
+  private final DistributedQueryHookManager hookManager;
+  private final DDLExecutor ddlExecutor;
+
+  public QueryExecutor(TajoMaster.MasterContext context, DDLExecutor ddlExecutor) {
+    this.context = context;
+    this.catalog = context.getCatalog();
+
+    this.ddlExecutor = ddlExecutor;
+    this.hookManager = new DistributedQueryHookManager();
+    this.hookManager.addHook(new CreateTableHook());
+    this.hookManager.addHook(new InsertIntoHook());
+  }
+
+  public SubmitQueryResponse execute(QueryContext queryContext, Session session, String sql, String jsonExpr,
+                      LogicalPlan plan) throws Exception {
+
+    SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder();
+    response.setIsForwarded(false);
+    response.setUserName(queryContext.get(SessionVars.USERNAME));
+
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    if (PlannerUtil.checkIfSetSession(rootNode)) {
+      execSetSession(session, plan, response);
+
+
+    } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+      context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+      ddlExecutor.execute(queryContext, plan);
+      response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      response.setResultCode(ClientProtos.ResultCode.OK);
+
+
+    } else if (plan.isExplain()) { // explain query
+      execExplain(plan, response);
+
+
+      // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
+    } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
+      execSimpleQuery(queryContext, session, sql, plan, response);
+
+
+      // NonFromQuery indicates a form of 'select a, x+y;'
+    } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
+      execNonFromQuery(queryContext, session, sql, plan, response);
+
+
+    } else { // it requires distributed execution. So, the query is forwarded to a query master.
+      executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+    }
+
+    response.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));
+
+    return response.build();
+  }
+
+  public void execSetSession(Session session, LogicalPlan plan,
+                             SubmitQueryResponse.Builder response) {
+    SetSessionNode setSessionNode = ((LogicalRootNode)plan.getRootBlock().getRoot()).getChild();
+
+    final String varName = setSessionNode.getName();
+
+    // SET CATALOG 'XXX'
+    if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
+      String databaseName = setSessionNode.getValue();
+
+      if (catalog.existDatabase(databaseName)) {
+        session.selectDatabase(setSessionNode.getValue());
+      } else {
+        response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        response.setResultCode(ClientProtos.ResultCode.ERROR);
+        response.setErrorMessage("database \"" + databaseName + "\" does not exists.");
+      }
+
+      // others
+    } else {
+      if (setSessionNode.isDefaultValue()) {
+        session.removeVariable(varName);
+      } else {
+        session.setVariable(varName, setSessionNode.getValue());
+      }
+    }
+
+    context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+    response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+    response.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
+  public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
+
+    String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+    Schema schema = new Schema();
+    schema.addColumn("explain", TajoDataTypes.Type.TEXT);
+    RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+
+    ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+
+    VTuple tuple = new VTuple(1);
+    String[] lines = explainStr.split("\n");
+    int bytesNum = 0;
+    for (String line : lines) {
+      tuple.put(0, DatumFactory.createText(line));
+      byte [] encodedData = encoder.toBytes(tuple);
+      bytesNum += encodedData.length;
+      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
+    }
+    serializedResBuilder.setSchema(schema.getProto());
+    serializedResBuilder.setBytesNum(bytesNum);
+
+    response.setResultSet(serializedResBuilder.build());
+    response.setMaxRowNum(lines.length);
+    response.setResultCode(ClientProtos.ResultCode.OK);
+    response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+  }
+
+  public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
+                              SubmitQueryResponse.Builder response) throws Exception {
+    ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+    if (scanNode == null) {
+      scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+    }
+    TableDesc desc = scanNode.getTableDesc();
+    int maxRow = Integer.MAX_VALUE;
+    if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+      LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+      maxRow = (int) limitNode.getFetchFirstNum();
+    }
+    if (desc.getStats().getNumRows() == 0) {
+      desc.getStats().setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    }
+
+    QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+    NonForwardQueryResultScanner queryResultScanner =
+        new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+
+    queryResultScanner.init();
+    session.addNonForwardQueryResultScanner(queryResultScanner);
+
+    response.setQueryId(queryId.getProto());
+    response.setMaxRowNum(maxRow);
+    response.setTableDesc(desc.getProto());
+    response.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
+  public void execNonFromQuery(QueryContext queryContext, Session session, String query,
+                               LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    Target[] targets = plan.getRootBlock().getRawTargets();
+    if (targets == null) {
+      throw new PlanningException("No targets");
+    }
+    final Tuple outTuple = new VTuple(targets.length);
+    for (int i = 0; i < targets.length; i++) {
+      EvalNode eval = targets[i].getEvalTree();
+      outTuple.put(i, eval.eval(null, null));
+    }
+    boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+    if (isInsert) {
+      InsertNode insertNode = rootNode.getChild();
+      insertNonFromQuery(queryContext, insertNode, responseBuilder);
+    } else {
+      Schema schema = PlannerUtil.targetToSchema(targets);
+      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+      byte[] serializedBytes = encoder.toBytes(outTuple);
+      ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+      serializedResBuilder.setSchema(schema.getProto());
+      serializedResBuilder.setBytesNum(serializedBytes.length);
+
+      responseBuilder.setResultSet(serializedResBuilder);
+      responseBuilder.setMaxRowNum(1);
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+    }
+  }
+
+  private void insertNonFromQuery(QueryContext queryContext,
+                                  InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
+      throws Exception {
+    String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
+    String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+    TableDesc tableDesc = null;
+    Path finalOutputDir = null;
+    if (insertNode.getTableName() != null) {
+      tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+      finalOutputDir = new Path(tableDesc.getPath());
+    } else {
+      finalOutputDir = insertNode.getPath();
+    }
+
+    TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+    taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+
+    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+    StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+    try {
+      exec.init();
+      exec.next();
+    } finally {
+      exec.close();
+    }
+
+    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+      // it moves the original table into the temporary location.
+      // Then it moves the new result table into the original table location.
+      // Upon failed, it recovers the original table if possible.
+      boolean movedToOldTable = false;
+      boolean committed = false;
+      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+      try {
+        if (fs.exists(finalOutputDir)) {
+          fs.rename(finalOutputDir, oldTableDir);
+          movedToOldTable = fs.exists(oldTableDir);
+        } else { // if the parent does not exist, make its parent directory.
+          fs.mkdirs(finalOutputDir.getParent());
+        }
+        fs.rename(stagingResultDir, finalOutputDir);
+        committed = fs.exists(finalOutputDir);
+      } catch (IOException ioe) {
+        // recover the old table
+        if (movedToOldTable && !committed) {
+          fs.rename(oldTableDir, finalOutputDir);
+        }
+      }
+    } else {
+      FileStatus[] files = fs.listStatus(stagingResultDir);
+      for (FileStatus eachFile: files) {
+        Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+        if (fs.exists(targetFilePath)) {
+          targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+        }
+        fs.rename(eachFile.getPath(), targetFilePath);
+      }
+    }
+
+    if (insertNode.hasTargetTable()) {
+      TableStats stats = tableDesc.getStats();
+      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+      stats.setNumBytes(volume);
+      stats.setNumRows(1);
+
+      CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+      builder.setTableName(tableDesc.getName());
+      builder.setStats(stats.getProto());
+
+      catalog.updateTableStats(builder.build());
+
+      responseBuilder.setTableDesc(tableDesc.getProto());
+    } else {
+      TableStats stats = new TableStats();
+      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+      stats.setNumBytes(volume);
+      stats.setNumRows(1);
+
+      // Empty TableDesc
+      List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+      CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+          .setTableName(nodeUniqName)
+          .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
+          .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+          .setStats(stats.getProto())
+          .build();
+
+      responseBuilder.setTableDesc(tableDescProto);
+    }
+
+    // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+    responseBuilder.setMaxRowNum(-1);
+    responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+    responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
+  public void executeDistributedQuery(QueryContext queryContext, Session session,
+                                      LogicalPlan plan,
+                                      String sql,
+                                      String jsonExpr,
+                                      SubmitQueryResponse.Builder responseBuilder) throws Exception {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    CatalogProtos.StoreType storeType = PlannerUtil.getStoreType(plan);
+    if (storeType != null) {
+      StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
+      StorageProperty storageProperty = sm.getStorageProperty();
+      if (!storageProperty.isSupportsInsertInto()) {
+        throw new VerifyException("Inserting into non-file storage is not supported.");
+      }
+      sm.beforeInsertOrCATS(rootNode.getChild());
+    }
+    context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
+    hookManager.doHooks(queryContext, plan);
+
+    QueryJobManager queryJobManager = this.context.getQueryJobManager();
+    QueryInfo queryInfo;
+
+    queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+
+    if(queryInfo == null) {
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+      responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+      LOG.error("Fail starting QueryMaster: " + sql);
+    } else {
+      responseBuilder.setIsForwarded(true);
+      responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      if(queryInfo.getQueryMasterHost() != null) {
+        responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+      }
+      responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+      LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
+          " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
new file mode 100644
index 0000000..5e3d0b6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.storage.StorageUtil;
+
+public class CreateTableHook implements DistributedQueryHook {
+
+  @Override
+  public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
+  }
+
+  @Override
+  public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    CreateTableNode createTableNode = rootNode.getChild();
+    String [] splitted  = CatalogUtil.splitFQTableName(createTableNode.getTableName());
+    String databaseName = splitted[0];
+    String tableName = splitted[1];
+    queryContext.setOutputTable(tableName);
+    queryContext.setOutputPath(
+        StorageUtil.concatPath(TajoConf.getWarehouseDir(queryContext.getConf()), databaseName, tableName));
+    if(createTableNode.getPartitionMethod() != null) {
+      queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
+    }
+    queryContext.setCreateTable();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
new file mode 100644
index 0000000..77c6ff4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+
+public interface DistributedQueryHook {
+  boolean isEligible(QueryContext queryContext, LogicalPlan plan);
+  void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
new file mode 100644
index 0000000..3dba176
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DistributedQueryHookManager {
+  private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
+
+  public void addHook(DistributedQueryHook hook) {
+    hooks.add(hook);
+  }
+
+  public void doHooks(QueryContext queryContext, LogicalPlan plan) {
+    for (DistributedQueryHook hook : hooks) {
+      if (hook.isEligible(queryContext, plan)) {
+        try {
+          hook.hook(queryContext, plan);
+        } catch (Throwable t) {
+          t.printStackTrace();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
new file mode 100644
index 0000000..14c4d8d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.NodeType;
+
+public class InsertIntoHook implements DistributedQueryHook {
+
+  @Override
+  public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+    return plan.getRootBlock().getRootType() == NodeType.INSERT;
+  }
+
+  @Override
+  public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+    queryContext.setInsert();
+
+    InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
+
+    // Set QueryContext settings, such as output table name and output path.
+    // It also remove data files if overwrite is true.
+    Path outputPath;
+    if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
+      queryContext.setOutputTable(insertNode.getTableName());
+      queryContext.setOutputPath(insertNode.getPath());
+      if (insertNode.hasPartition()) {
+        queryContext.setPartitionMethod(insertNode.getPartitionMethod());
+      }
+    } else { // INSERT INTO LOCATION ...
+      // When INSERT INTO LOCATION, must not set output table.
+      outputPath = insertNode.getPath();
+      queryContext.setFileOutput();
+      queryContext.setOutputPath(outputPath);
+    }
+
+    if (insertNode.isOverwrite()) {
+      queryContext.setOutputOverwrite();
+    }
+
+  }
+}