You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/05/20 20:46:38 UTC
[40/48] git commit: TAJO-803: INSERT INTO without FROM throws
ClassCastException. (Hyoungjun Kim via hyunsik)
TAJO-803: INSERT INTO without FROM throws ClassCastException. (Hyoungjun Kim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8e800469
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8e800469
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8e800469
Branch: refs/heads/window_function
Commit: 8e8004698f4bd891a415fcc62add59998ced2ae3
Parents: 9bbf87e
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon May 12 16:31:03 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon May 12 16:31:03 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 12 +-
.../java/org/apache/tajo/client/TajoClient.java | 11 +-
.../tajo/engine/planner/LogicalPlanner.java | 39 ++---
.../engine/planner/physical/EvalExprExec.java | 22 ++-
.../engine/planner/physical/StoreTableExec.java | 7 +-
.../org/apache/tajo/master/GlobalEngine.java | 147 ++++++++++++++++---
.../apache/tajo/master/querymaster/Query.java | 10 +-
.../master/querymaster/QueryMasterTask.java | 91 +++++++-----
.../apache/tajo/worker/TaskAttemptContext.java | 18 ++-
.../tajo/engine/query/TestInsertQuery.java | 94 ++++++++++++
11 files changed, 351 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 17af7bc..47884b8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-803: INSERT INTO without FROM throws ClassCastException.
+ (Hyoungjun Kim via hyunsik)
+
TAJO-813: CLI should support comment character with multi-line query.
(Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index c7368c9..e0ca62a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -404,10 +404,18 @@ public class TajoCli {
private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long startTime) {
ResultSet res = null;
try {
- res = TajoClient.createResultSet(client, response);
+ QueryId queryId = new QueryId(response.getQueryId());
float responseTime = ((float)(System.currentTimeMillis() - startTime) / 1000.0f);
TableDesc desc = new TableDesc(response.getTableDesc());
- outputFormatter.printResult(sout, sin, desc, responseTime, res);
+
+ // non-forwarded INSERT INTO query does not have any query id.
+ // In this case, it just returns succeeded query information without printing the query results.
+ if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+ outputFormatter.printResult(sout, sin, desc, responseTime, res);
+ } else {
+ res = TajoClient.createResultSet(client, response);
+ outputFormatter.printResult(sout, sin, desc, responseTime, res);
+ }
} catch (Throwable t) {
outputFormatter.printErrorMessage(sout, t);
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 9e45bf0..7d84592 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -354,10 +354,15 @@ public class TajoClient implements Closeable {
return this.getQueryResultAndWait(queryId);
}
} else {
- if (response.hasResultSet() || response.hasTableDesc()) {
- return createResultSet(this, response);
- } else {
+ // If a non-forwarded insert into query
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() < 0) {
return this.createNullResultSet(queryId);
+ } else {
+ if (response.hasResultSet() || response.hasTableDesc()) {
+ return createResultSet(this, response);
+ } else {
+ return this.createNullResultSet(queryId);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 52c9782..d5d2d47 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -1244,25 +1244,30 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
Schema tableSchema = insertNode.getTableSchema();
Schema targetColumns = insertNode.getTargetSchema();
- ProjectionNode projectionNode = insertNode.getChild();
-
- // Modifying projected columns by adding NULL constants
- // It is because that table appender does not support target columns to be written.
- List<Target> targets = TUtil.newList();
- for (int i = 0, j = 0; i < tableSchema.size(); i++) {
- Column column = tableSchema.getColumn(i);
-
- if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
- targets.add(projectionNode.getTargets()[j++]);
- } else {
- targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
+ LogicalNode child = insertNode.getChild();
+ if (child instanceof Projectable) {
+ Projectable projectionNode = (Projectable) insertNode.getChild();
+
+ // Modifying projected columns by adding NULL constants
+ // It is because that table appender does not support target columns to be written.
+ List<Target> targets = TUtil.newList();
+ for (int i = 0, j = 0; i < tableSchema.size(); i++) {
+ Column column = tableSchema.getColumn(i);
+
+ if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
+ targets.add(projectionNode.getTargets()[j++]);
+ } else {
+ targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
+ }
}
- }
- projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
+ projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
- insertNode.setInSchema(projectionNode.getOutSchema());
- insertNode.setOutSchema(projectionNode.getOutSchema());
- insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+ insertNode.setInSchema(projectionNode.getOutSchema());
+ insertNode.setOutSchema(projectionNode.getOutSchema());
+ insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+ } else {
+ throw new RuntimeException("Wrong child node type: " + child.getType() + " for insert");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index a843bce..b1ab7c4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -29,6 +29,7 @@ import java.io.IOException;
public class EvalExprExec extends PhysicalExec {
private final EvalExprNode plan;
private float progress;
+ private boolean executedOnce = false;
public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
super(context, plan.getInSchema(), plan.getOutSchema());
@@ -41,17 +42,24 @@ public class EvalExprExec extends PhysicalExec {
}
@Override
- public Tuple next() throws IOException {
- Target [] targets = plan.getTargets();
- Tuple t = new VTuple(targets.length);
- for (int i = 0; i < targets.length; i++) {
- t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+ public Tuple next() throws IOException {
+ if (!executedOnce) {
+ Target [] targets = plan.getTargets();
+ Tuple t = new VTuple(targets.length);
+ for (int i = 0; i < targets.length; i++) {
+ t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+ }
+
+ executedOnce = true;
+ return t;
+ } else {
+ return null;
}
- return t;
}
@Override
- public void rescan() throws IOException {
+ public void rescan() throws IOException {
+ executedOnce = false;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 1f927a6..b0c3c31 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -16,9 +16,6 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.CatalogUtil;
@@ -93,7 +90,9 @@ public class StoreTableExec extends UnaryPhysicalExec {
appender.close();
// Collect statistics data
context.setResultStats(appender.getStats());
- context.addShuffleFileOutput(0, context.getTaskId().toString());
+ if (context.getTaskId() != null) {
+ context.addShuffleFileOutput(0, context.getTaskId().toString());
+ }
}
appender = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 6caf031..3b81ce2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -22,12 +22,14 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.AlterTablespaceSetType;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
@@ -35,6 +37,7 @@ import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
@@ -46,13 +49,18 @@ import org.apache.tajo.engine.parser.HiveQLAnalyzer;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.Query;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.sql.SQLException;
@@ -61,7 +69,6 @@ import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
@@ -242,25 +249,29 @@ public class GlobalEngine extends AbstractService {
if (targets == null) {
throw new PlanningException("No targets");
}
- Tuple outTuple = new VTuple(targets.length);
+ final Tuple outTuple = new VTuple(targets.length);
for (int i = 0; i < targets.length; i++) {
EvalNode eval = targets[i].getEvalTree();
outTuple.put(i, eval.eval(null, null));
}
-
- Schema schema = PlannerUtil.targetToSchema(targets);
- RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
- byte [] serializedBytes = encoder.toBytes(outTuple);
- SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
- serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
- serializedResBuilder.setSchema(schema.getProto());
- serializedResBuilder.setBytesNum(serializedBytes.length);
-
- responseBuilder.setResultSet(serializedResBuilder);
- responseBuilder.setMaxRowNum(1);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
+ boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+ if (isInsert) {
+ InsertNode insertNode = rootNode.getChild();
+ insertNonFromQuery(insertNode, responseBuilder);
+ } else {
+ Schema schema = PlannerUtil.targetToSchema(targets);
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+ byte[] serializedBytes = encoder.toBytes(outTuple);
+ SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+ serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+ serializedResBuilder.setSchema(schema.getProto());
+ serializedResBuilder.setBytesNum(serializedBytes.length);
+
+ responseBuilder.setResultSet(serializedResBuilder);
+ responseBuilder.setMaxRowNum(1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ }
} else { // it requires distributed execution. So, the query is forwarded to a query master.
context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
@@ -289,6 +300,107 @@ public class GlobalEngine extends AbstractService {
return response;
}
+ private void insertNonFromQuery(InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
+ throws Exception {
+ String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
+ String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+ FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+ Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString());
+
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ fs.mkdirs(stagingResultDir);
+
+ TableDesc tableDesc = null;
+ Path finalOutputDir = null;
+ if (insertNode.getTableName() != null) {
+ tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+ finalOutputDir = tableDesc.getPath();
+ } else {
+ finalOutputDir = insertNode.getPath();
+ }
+
+ TaskAttemptContext taskAttemptContext =
+ new TaskAttemptContext(context.getConf(), null, (CatalogProtos.FragmentProto[]) null, stagingDir);
+ taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+ try {
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
+ }
+
+ if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+ // it moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ try {
+ if (fs.exists(finalOutputDir)) {
+ fs.rename(finalOutputDir, oldTableDir);
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputDir.getParent());
+ }
+ fs.rename(stagingResultDir, finalOutputDir);
+ committed = fs.exists(finalOutputDir);
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+ fs.rename(oldTableDir, finalOutputDir);
+ }
+ }
+ } else {
+ FileStatus[] files = fs.listStatus(stagingResultDir);
+ for (FileStatus eachFile: files) {
+ Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+ if (fs.exists(targetFilePath)) {
+ targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+ }
+ fs.rename(eachFile.getPath(), targetFilePath);
+ }
+ }
+
+ if (insertNode.hasTargetTable()) {
+ TableStats stats = tableDesc.getStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
+
+ catalog.dropTable(insertNode.getTableName());
+ catalog.createTable(tableDesc);
+
+ responseBuilder.setTableDesc(tableDesc.getProto());
+ } else {
+ TableStats stats = new TableStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
+
+ // Empty TableDesc
+ List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+ CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+ .setTableName(nodeUniqName)
+ .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
+ .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+ .setStats(stats.getProto())
+ .build();
+
+ responseBuilder.setTableDesc(tableDescProto);
+ }
+
+ // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+ responseBuilder.setMaxRowNum(-1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ }
+
+
public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
try {
LOG.info("SQL: " + sql);
@@ -395,7 +507,8 @@ public class GlobalEngine extends AbstractService {
AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
builder.setSpaceName(spaceName);
if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
- AlterTablespaceCommand.Builder commandBuilder = AlterTablespaceCommand.newBuilder();
+ AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
+ AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
commandBuilder.build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 6a5248d..2848095 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -576,12 +576,12 @@ public class Query implements EventHandler<QueryEvent> {
query.setResultDesc(finalTable);
}
}
+ }
- private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(systemConf);
- ContentSummary directorySummary = fs.getContentSummary(tablePath);
- return directorySummary.getLength();
- }
+ public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+ ContentSummary directorySummary = fs.getContentSummary(tablePath);
+ return directorySummary.getLength();
}
public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 35d2cff..f812715 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -364,54 +364,19 @@ public class QueryMasterTask extends CompositeService {
}
}
- /**
- * It initializes the final output and staging directory and sets
- * them to variables.
- */
private void initStagingDir() throws IOException {
-
- String realUser;
- String currentUser;
- UserGroupInformation ugi;
- ugi = UserGroupInformation.getLoginUser();
- realUser = ugi.getShortUserName();
- currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
-
Path stagingDir = null;
Path outputDir = null;
- try {
- ////////////////////////////////////////////
- // Create Output Directory
- ////////////////////////////////////////////
-
- stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString());
+ FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
- if (defaultFS.exists(stagingDir)) {
- throw new IOException("The staging directory '" + stagingDir + "' already exists");
- }
- defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
- FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
- String owner = fsStatus.getOwner();
-
- if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + stagingDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
- }
+ try {
- if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + stagingDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + STAGING_DIR_PERMISSION);
- defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
- }
+ stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString());
+ defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
// Create a subdirectories
- defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
LOG.info("The staging dir '" + stagingDir + "' is created.");
+
queryContext.setStagingDir(stagingDir);
/////////////////////////////////////////////////
@@ -435,6 +400,52 @@ public class QueryMasterTask extends CompositeService {
}
}
+ /**
+ * It initializes the final output and staging directory and sets
+ * them to variables.
+ */
+ public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException {
+
+ String realUser;
+ String currentUser;
+ UserGroupInformation ugi;
+ ugi = UserGroupInformation.getLoginUser();
+ realUser = ugi.getShortUserName();
+ currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ Path stagingDir = null;
+
+ ////////////////////////////////////////////
+ // Create Output Directory
+ ////////////////////////////////////////////
+
+ stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);
+
+ if (fs.exists(stagingDir)) {
+ throw new IOException("The staging directory '" + stagingDir + "' already exists");
+ }
+ fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ FileStatus fsStatus = fs.getFileStatus(stagingDir);
+ String owner = fsStatus.getOwner();
+
+ if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the user's query " +
+ "directory " + stagingDir + " is not as expected. " +
+ "It is owned by " + owner + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser);
+ }
+
+ if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + stagingDir + " are " +
+ "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+ "to correct value " + STAGING_DIR_PERMISSION);
+ fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ }
+
+ return stagingDir;
+ }
+
public Query getQuery() {
return query;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 6f3281c..f42df1d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -74,14 +74,16 @@ public class TaskAttemptContext {
final Path workDir) {
this.conf = conf;
this.queryId = queryId;
-
- for(FragmentProto t : fragments) {
- if (fragmentMap.containsKey(t.getId())) {
- fragmentMap.get(t.getId()).add(t);
- } else {
- List<FragmentProto> frags = new ArrayList<FragmentProto>();
- frags.add(t);
- fragmentMap.put(t.getId(), frags);
+
+ if (fragments != null) {
+ for (FragmentProto t : fragments) {
+ if (fragmentMap.containsKey(t.getId())) {
+ fragmentMap.get(t.getId()).add(t);
+ } else {
+ List<FragmentProto> frags = new ArrayList<FragmentProto>();
+ frags.add(t);
+ fragmentMap.put(t.getId(), frags);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 06ce973..8453488 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -32,6 +32,8 @@ import org.apache.tajo.catalog.TableDesc;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.sql.ResultSet;
import static org.junit.Assert.*;
@@ -289,4 +291,96 @@ public class TestInsertQuery extends QueryTestCaseBase {
assertEquals(5, desc.getStats().getNumRows().intValue());
}
}
+
+ @Test
+ public final void testInsertOverwriteTableWithNonFromQuery() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+ ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+ res.close();
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+ res = executeString("insert overwrite into " + tableName
+ + " select 1::INT4, 2.1::FLOAT4, 'test'; ");
+
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHCatalogStoreRunning()) {
+ assertEquals(1, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from " + tableName + ";");
+ assertTrue(res.next());
+
+ assertEquals(3, res.getMetaData().getColumnCount());
+ assertEquals(1, res.getInt(1));
+ assertEquals(2.1f, res.getFloat(2), 10);
+ assertEquals("test", res.getString(3));
+
+ res.close();
+ }
+
+ @Test
+ public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception {
+ String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery");
+ ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)");
+ res.close();
+ CatalogService catalog = testingCluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
+
+ res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
+ if (!testingCluster.isHCatalogStoreRunning()) {
+ assertEquals(1, desc.getStats().getNumRows().intValue());
+ }
+
+ res = executeString("select * from " + tableName + ";");
+ assertTrue(res.next());
+
+ assertEquals(3, res.getMetaData().getColumnCount());
+ assertEquals(1, res.getInt(1));
+ assertEquals("", res.getString(2));
+ assertEquals("test", res.getString(3));
+
+ res.close();
+ }
+
+ @Test
+ public final void testInsertOverwritePathWithNonFromQuery() throws Exception {
+ ResultSet res = executeString("insert overwrite into location " +
+ "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " +
+ "USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "select 1::INT4, 2.1::FLOAT4, 'test'");
+
+ res.close();
+ FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
+ Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery");
+ assertTrue(fs.exists(path));
+ assertEquals(1, fs.listStatus(path).length);
+
+ CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
+ FileStatus file = fs.listStatus(path)[0];
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(codec.createInputStream(fs.open(file.getPath()))));
+
+ try {
+ String line = reader.readLine();
+ assertNotNull(line);
+
+ String[] tokens = line.split("\\|");
+
+ assertEquals(3, tokens.length);
+ assertEquals("1", tokens[0]);
+ assertEquals("2.1", tokens[1]);
+ assertEquals("test", tokens[2]);
+ } finally {
+ reader.close();
+ }
+ }
}