You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:38 UTC

[40/48] git commit: TAJO-803: INSERT INTO without FROM throws ClassCastException. (Hyoungjun Kim via hyunsik)

TAJO-803: INSERT INTO without FROM throws ClassCastException. (Hyoungjun Kim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8e800469
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8e800469
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8e800469

Branch: refs/heads/window_function
Commit: 8e8004698f4bd891a415fcc62add59998ced2ae3
Parents: 9bbf87e
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon May 12 16:31:03 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon May 12 16:31:03 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  12 +-
 .../java/org/apache/tajo/client/TajoClient.java |  11 +-
 .../tajo/engine/planner/LogicalPlanner.java     |  39 ++---
 .../engine/planner/physical/EvalExprExec.java   |  22 ++-
 .../engine/planner/physical/StoreTableExec.java |   7 +-
 .../org/apache/tajo/master/GlobalEngine.java    | 147 ++++++++++++++++---
 .../apache/tajo/master/querymaster/Query.java   |  10 +-
 .../master/querymaster/QueryMasterTask.java     |  91 +++++++-----
 .../apache/tajo/worker/TaskAttemptContext.java  |  18 ++-
 .../tajo/engine/query/TestInsertQuery.java      |  94 ++++++++++++
 11 files changed, 351 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 17af7bc..47884b8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-803: INSERT INTO without FROM throws ClassCastException.
+    (Hyoungjun Kim via hyunsik)
+
     TAJO-813: CLI should support comment character with multi-line query.
     (Hyoungjun Kim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index c7368c9..e0ca62a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -404,10 +404,18 @@ public class TajoCli {
   private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long startTime) {
     ResultSet res = null;
     try {
-      res = TajoClient.createResultSet(client, response);
+      QueryId queryId = new QueryId(response.getQueryId());
       float responseTime = ((float)(System.currentTimeMillis() - startTime) / 1000.0f);
       TableDesc desc = new TableDesc(response.getTableDesc());
-      outputFormatter.printResult(sout, sin, desc, responseTime, res);
+
+      // non-forwarded INSERT INTO query does not have any query id.
+      // In this case, it just returns succeeded query information without printing the query results.
+      if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+        outputFormatter.printResult(sout, sin, desc, responseTime, res);
+      } else {
+        res = TajoClient.createResultSet(client, response);
+        outputFormatter.printResult(sout, sin, desc, responseTime, res);
+      }
     } catch (Throwable t) {
       outputFormatter.printErrorMessage(sout, t);
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 9e45bf0..7d84592 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -354,10 +354,15 @@ public class TajoClient implements Closeable {
         return this.getQueryResultAndWait(queryId);
       }
     } else {
-      if (response.hasResultSet() || response.hasTableDesc()) {
-        return createResultSet(this, response);
-      } else {
+      // If a non-forwarded insert into query
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() < 0) {
         return this.createNullResultSet(queryId);
+      } else {
+        if (response.hasResultSet() || response.hasTableDesc()) {
+          return createResultSet(this, response);
+        } else {
+          return this.createNullResultSet(queryId);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 52c9782..d5d2d47 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -1244,25 +1244,30 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     Schema tableSchema = insertNode.getTableSchema();
     Schema targetColumns = insertNode.getTargetSchema();
 
-    ProjectionNode projectionNode = insertNode.getChild();
-
-    // Modifying projected columns by adding NULL constants
-    // It is because that table appender does not support target columns to be written.
-    List<Target> targets = TUtil.newList();
-    for (int i = 0, j = 0; i < tableSchema.size(); i++) {
-      Column column = tableSchema.getColumn(i);
-
-      if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
-        targets.add(projectionNode.getTargets()[j++]);
-      } else {
-        targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
+    LogicalNode child = insertNode.getChild();
+    if (child instanceof Projectable) {
+      Projectable projectionNode = (Projectable) insertNode.getChild();
+
+      // Modifying projected columns by adding NULL constants
+      // It is because that table appender does not support target columns to be written.
+      List<Target> targets = TUtil.newList();
+      for (int i = 0, j = 0; i < tableSchema.size(); i++) {
+        Column column = tableSchema.getColumn(i);
+
+        if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
+          targets.add(projectionNode.getTargets()[j++]);
+        } else {
+          targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
+        }
       }
-    }
-    projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
+      projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
 
-    insertNode.setInSchema(projectionNode.getOutSchema());
-    insertNode.setOutSchema(projectionNode.getOutSchema());
-    insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+      insertNode.setInSchema(projectionNode.getOutSchema());
+      insertNode.setOutSchema(projectionNode.getOutSchema());
+      insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+    } else {
+      throw new RuntimeException("Wrong child node type: " +  child.getType() + " for insert");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index a843bce..b1ab7c4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 public class EvalExprExec extends PhysicalExec {
   private final EvalExprNode plan;
   private float progress;
+  private boolean executedOnce = false;
 
   public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -41,17 +42,24 @@ public class EvalExprExec extends PhysicalExec {
   }
 
   @Override
-  public Tuple next() throws IOException {    
-    Target [] targets = plan.getTargets();
-    Tuple t = new VTuple(targets.length);
-    for (int i = 0; i < targets.length; i++) {
-      t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+  public Tuple next() throws IOException {
+    if (!executedOnce) {
+      Target [] targets = plan.getTargets();
+      Tuple t = new VTuple(targets.length);
+      for (int i = 0; i < targets.length; i++) {
+        t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+      }
+
+      executedOnce = true;
+      return t;
+    } else {
+      return null;
     }
-    return t;
   }
 
   @Override
-  public void rescan() throws IOException {    
+  public void rescan() throws IOException {
+    executedOnce = false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 1f927a6..b0c3c31 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * 
- */
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.CatalogUtil;
@@ -93,7 +90,9 @@ public class StoreTableExec extends UnaryPhysicalExec {
       appender.close();
       // Collect statistics data
       context.setResultStats(appender.getStats());
-      context.addShuffleFileOutput(0, context.getTaskId().toString());
+      if (context.getTaskId() != null) {
+        context.addShuffleFileOutput(0, context.getTaskId().toString());
+      }
     }
 
     appender = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 6caf031..3b81ce2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -22,12 +22,14 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
@@ -35,6 +37,7 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
@@ -46,13 +49,18 @@ import org.apache.tajo.engine.parser.HiveQLAnalyzer;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -61,7 +69,6 @@ import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
 
@@ -242,25 +249,29 @@ public class GlobalEngine extends AbstractService {
       if (targets == null) {
         throw new PlanningException("No targets");
       }
-      Tuple outTuple = new VTuple(targets.length);
+      final Tuple outTuple = new VTuple(targets.length);
       for (int i = 0; i < targets.length; i++) {
         EvalNode eval = targets[i].getEvalTree();
         outTuple.put(i, eval.eval(null, null));
       }
-
-      Schema schema = PlannerUtil.targetToSchema(targets);
-      RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-      byte [] serializedBytes = encoder.toBytes(outTuple);
-      SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
-      serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
-      serializedResBuilder.setSchema(schema.getProto());
-      serializedResBuilder.setBytesNum(serializedBytes.length);
-
-      responseBuilder.setResultSet(serializedResBuilder);
-      responseBuilder.setMaxRowNum(1);
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
+      boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+      if (isInsert) {
+        InsertNode insertNode = rootNode.getChild();
+        insertNonFromQuery(insertNode, responseBuilder);
+      } else {
+        Schema schema = PlannerUtil.targetToSchema(targets);
+        RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+        byte[] serializedBytes = encoder.toBytes(outTuple);
+        SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+        serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+        serializedResBuilder.setSchema(schema.getProto());
+        serializedResBuilder.setBytesNum(serializedBytes.length);
+
+        responseBuilder.setResultSet(serializedResBuilder);
+        responseBuilder.setMaxRowNum(1);
+        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      }
     } else { // it requires distributed execution. So, the query is forwarded to a query master.
       context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
       hookManager.doHooks(queryContext, plan);
@@ -289,6 +300,107 @@ public class GlobalEngine extends AbstractService {
     return response;
   }
 
+  private void insertNonFromQuery(InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
+      throws Exception {
+    String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
+    String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString());
+
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
+    TableDesc tableDesc = null;
+    Path finalOutputDir = null;
+    if (insertNode.getTableName() != null) {
+      tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+      finalOutputDir = tableDesc.getPath();
+    } else {
+      finalOutputDir = insertNode.getPath();
+    }
+
+    TaskAttemptContext taskAttemptContext =
+        new TaskAttemptContext(context.getConf(), null, (CatalogProtos.FragmentProto[]) null, stagingDir);
+    taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+
+    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+    StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+    try {
+      exec.init();
+      exec.next();
+    } finally {
+      exec.close();
+    }
+
+    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+      // it moves the original table into the temporary location.
+      // Then it moves the new result table into the original table location.
+      // Upon failed, it recovers the original table if possible.
+      boolean movedToOldTable = false;
+      boolean committed = false;
+      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+      try {
+        if (fs.exists(finalOutputDir)) {
+          fs.rename(finalOutputDir, oldTableDir);
+          movedToOldTable = fs.exists(oldTableDir);
+        } else { // if the parent does not exist, make its parent directory.
+          fs.mkdirs(finalOutputDir.getParent());
+        }
+        fs.rename(stagingResultDir, finalOutputDir);
+        committed = fs.exists(finalOutputDir);
+      } catch (IOException ioe) {
+        // recover the old table
+        if (movedToOldTable && !committed) {
+          fs.rename(oldTableDir, finalOutputDir);
+        }
+      }
+    } else {
+      FileStatus[] files = fs.listStatus(stagingResultDir);
+      for (FileStatus eachFile: files) {
+        Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+        if (fs.exists(targetFilePath)) {
+          targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+        }
+        fs.rename(eachFile.getPath(), targetFilePath);
+      }
+    }
+
+    if (insertNode.hasTargetTable()) {
+      TableStats stats = tableDesc.getStats();
+      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+      stats.setNumBytes(volume);
+      stats.setNumRows(1);
+
+      catalog.dropTable(insertNode.getTableName());
+      catalog.createTable(tableDesc);
+
+      responseBuilder.setTableDesc(tableDesc.getProto());
+    } else {
+      TableStats stats = new TableStats();
+      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+      stats.setNumBytes(volume);
+      stats.setNumRows(1);
+
+      // Empty TableDesc
+      List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+      CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+          .setTableName(nodeUniqName)
+          .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
+          .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+          .setStats(stats.getProto())
+          .build();
+
+      responseBuilder.setTableDesc(tableDescProto);
+    }
+
+    // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+    responseBuilder.setMaxRowNum(-1);
+    responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+    responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
+
   public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
     try {
       LOG.info("SQL: " + sql);
@@ -395,7 +507,8 @@ public class GlobalEngine extends AbstractService {
     AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
     builder.setSpaceName(spaceName);
     if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
-      AlterTablespaceCommand.Builder commandBuilder = AlterTablespaceCommand.newBuilder();
+      AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
+          AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
       commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
       commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
       commandBuilder.build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 6a5248d..2848095 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -576,12 +576,12 @@ public class Query implements EventHandler<QueryEvent> {
         query.setResultDesc(finalTable);
       }
     }
+  }
 
-    private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
-      FileSystem fs = tablePath.getFileSystem(systemConf);
-      ContentSummary directorySummary = fs.getContentSummary(tablePath);
-      return directorySummary.getLength();
-    }
+  public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+    ContentSummary directorySummary = fs.getContentSummary(tablePath);
+    return directorySummary.getLength();
   }
 
   public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 35d2cff..f812715 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -364,54 +364,19 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  /**
-   * It initializes the final output and staging directory and sets
-   * them to variables.
-   */
   private void initStagingDir() throws IOException {
-
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-    FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
-
     Path stagingDir = null;
     Path outputDir = null;
-    try {
-      ////////////////////////////////////////////
-      // Create Output Directory
-      ////////////////////////////////////////////
-
-      stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString());
+    FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
 
-      if (defaultFS.exists(stagingDir)) {
-        throw new IOException("The staging directory '" + stagingDir + "' already exists");
-      }
-      defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
-      FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
-      String owner = fsStatus.getOwner();
-
-      if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
-        throw new IOException("The ownership on the user's query " +
-            "directory " + stagingDir + " is not as expected. " +
-            "It is owned by " + owner + ". The directory must " +
-            "be owned by the submitter " + currentUser + " or " +
-            "by " + realUser);
-      }
+    try {
 
-      if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
-        LOG.info("Permissions on staging directory " + stagingDir + " are " +
-            "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-            "to correct value " + STAGING_DIR_PERMISSION);
-        defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
-      }
+      stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString());
+      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
 
       // Create a subdirectories
-      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
       LOG.info("The staging dir '" + stagingDir + "' is created.");
+
       queryContext.setStagingDir(stagingDir);
 
       /////////////////////////////////////////////////
@@ -435,6 +400,52 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
+  /**
+   * It initializes the final output and staging directory and sets
+   * them to variables.
+   */
+  public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    Path stagingDir = null;
+
+    ////////////////////////////////////////////
+    // Create Output Directory
+    ////////////////////////////////////////////
+
+    stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);
+
+    if (fs.exists(stagingDir)) {
+      throw new IOException("The staging directory '" + stagingDir + "' already exists");
+    }
+    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    FileStatus fsStatus = fs.getFileStatus(stagingDir);
+    String owner = fsStatus.getOwner();
+
+    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+      throw new IOException("The ownership on the user's query " +
+          "directory " + stagingDir + " is not as expected. " +
+          "It is owned by " + owner + ". The directory must " +
+          "be owned by the submitter " + currentUser + " or " +
+          "by " + realUser);
+    }
+
+    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+      LOG.info("Permissions on staging directory " + stagingDir + " are " +
+          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+          "to correct value " + STAGING_DIR_PERMISSION);
+      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    }
+
+    return stagingDir;
+  }
+
   public Query getQuery() {
     return query;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 6f3281c..f42df1d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -74,14 +74,16 @@ public class TaskAttemptContext {
                             final Path workDir) {
     this.conf = conf;
     this.queryId = queryId;
-    
-    for(FragmentProto t : fragments) {
-      if (fragmentMap.containsKey(t.getId())) {
-        fragmentMap.get(t.getId()).add(t);
-      } else {
-        List<FragmentProto> frags = new ArrayList<FragmentProto>();
-        frags.add(t);
-        fragmentMap.put(t.getId(), frags);
+
+    if (fragments != null) {
+      for (FragmentProto t : fragments) {
+        if (fragmentMap.containsKey(t.getId())) {
+          fragmentMap.get(t.getId()).add(t);
+        } else {
+          List<FragmentProto> frags = new ArrayList<FragmentProto>();
+          frags.add(t);
+          fragmentMap.put(t.getId(), frags);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 06ce973..8453488 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -32,6 +32,8 @@ import org.apache.tajo.catalog.TableDesc;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.sql.ResultSet;
 
 import static org.junit.Assert.*;
@@ -289,4 +291,96 @@ public class TestInsertQuery extends QueryTestCaseBase {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
   }
+
+  @Test
+  public final void testInsertOverwriteTableWithNonFromQuery() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+    ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+    res.close();
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+    res = executeString("insert overwrite into " + tableName
+        + " select 1::INT4, 2.1::FLOAT4, 'test'; ");
+
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(1, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + ";");
+    assertTrue(res.next());
+
+    assertEquals(3, res.getMetaData().getColumnCount());
+    assertEquals(1, res.getInt(1));
+    assertEquals(2.1f, res.getFloat(2), 10);
+    assertEquals("test", res.getString(3));
+
+    res.close();
+  }
+
+  @Test
+  public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+    ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+    res.close();
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+    res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(1, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + ";");
+    assertTrue(res.next());
+
+    assertEquals(3, res.getMetaData().getColumnCount());
+    assertEquals(1, res.getInt(1));
+    assertEquals("", res.getString(2));
+    assertEquals("test", res.getString(3));
+
+    res.close();
+  }
+
+  @Test
+  public final void testInsertOverwritePathWithNonFromQuery() throws Exception {
+    ResultSet res = executeString("insert overwrite into location " +
+        "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " +
+        "USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+        "select 1::INT4, 2.1::FLOAT4, 'test'");
+
+    res.close();
+    FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+    Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery");
+    assertTrue(fs.exists(path));
+    assertEquals(1, fs.listStatus(path).length);
+
+    CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+    FileStatus file = fs.listStatus(path)[0];
+    CompressionCodec codec = factory.getCodec(file.getPath());
+    assertTrue(codec instanceof DeflateCodec);
+
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(codec.createInputStream(fs.open(file.getPath()))));
+
+    try {
+      String line = reader.readLine();
+      assertNotNull(line);
+
+      String[] tokens = line.split("\\|");
+
+      assertEquals(3, tokens.length);
+      assertEquals("1", tokens[0]);
+      assertEquals("2.1", tokens[1]);
+      assertEquals("test", tokens[2]);
+    } finally {
+      reader.close();
+    }
+  }
 }