You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/22 07:10:43 UTC
[1/2] tajo git commit: TAJO-1261: Separate query and ddl execution
codes from GlobalEngine.
Repository: tajo
Updated Branches:
refs/heads/master 341310746 -> a4c348423
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 9ab4f0a..c96b86e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -37,6 +37,8 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
+import org.apache.tajo.master.exec.prehook.InsertIntoHook;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -390,10 +392,6 @@ public class QueryMasterTask extends CompositeService {
optimizer.optimize(queryContext, plan);
- GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
- hookManager.addHook(new GlobalEngine.InsertHook());
- hookManager.doHooks(queryContext, plan);
-
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
if (scanNodes != null) {
[2/2] tajo git commit: TAJO-1261: Separate query and ddl execution
codes from GlobalEngine.
Posted by hy...@apache.org.
TAJO-1261: Separate query and ddl execution codes from GlobalEngine.
Closes #312
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a4c34842
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a4c34842
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a4c34842
Branch: refs/heads/master
Commit: a4c3484232e139d4d21d0cfb9c31e5d784de652b
Parents: 3413107
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Dec 19 20:55:26 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 22 15:07:17 2014 +0900
----------------------------------------------------------------------
CHANGES | 7 +-
.../org/apache/tajo/master/GlobalEngine.java | 789 +------------------
.../java/org/apache/tajo/master/TajoMaster.java | 2 +-
.../tajo/master/TajoMasterClientService.java | 9 +-
.../apache/tajo/master/exec/DDLExecutor.java | 434 ++++++++++
.../apache/tajo/master/exec/QueryExecutor.java | 391 +++++++++
.../master/exec/prehook/CreateTableHook.java | 53 ++
.../exec/prehook/DistributedQueryHook.java | 27 +
.../prehook/DistributedQueryHookManager.java | 45 ++
.../master/exec/prehook/InsertIntoHook.java | 61 ++
.../master/querymaster/QueryMasterTask.java | 6 +-
11 files changed, 1056 insertions(+), 768 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e30e24c..36cff8a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,8 +24,11 @@ Release 0.9.1 - unreleased
IMPROVEMENT
- TAJO-1247: Store type 'TEXTFILE' should be TEXT while keeping enum 'TEXTFILE' in protobuf.
- (DaeMyung Kang via hyunsik)
+ TAJO-1261: Separate query and ddl execution codes from GlobalEngine.
+ (hyunsik)
+
+ TAJO-1247: Store type 'TEXTFILE' should be TEXT while keeping enum
+ 'TEXTFILE' in protobuf. (DaeMyung Kang via hyunsik)
TAJO-1221: HA TajoClient should not connect TajoMaster at the first.
(jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 71b1f9b..d7e7670 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -19,66 +19,42 @@
package org.apache.tajo.master;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.algebra.AlterTablespaceSetType;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
-import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryInfo;
-import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.exec.DDLExecutor;
+import org.apache.tajo.master.exec.QueryExecutor;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.plan.*;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.ProtoUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimeZone;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
+
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
public class GlobalEngine extends AbstractService {
@@ -96,11 +72,17 @@ public class GlobalEngine extends AbstractService {
private LogicalPlanVerifier annotatedPlanVerifier;
private DistributedQueryHookManager hookManager;
+ private QueryExecutor queryExecutor;
+ private DDLExecutor ddlExecutor;
+
public GlobalEngine(final MasterContext context) {
super(GlobalEngine.class.getName());
this.context = context;
this.catalog = context.getCatalog();
this.sm = context.getStorageManager();
+
+ this.ddlExecutor = new DDLExecutor(context);
+ this.queryExecutor = new QueryExecutor(context, ddlExecutor);
}
public void start() {
@@ -110,10 +92,6 @@ public class GlobalEngine extends AbstractService {
planner = new LogicalPlanner(context.getCatalog());
optimizer = new LogicalOptimizer(context.getConf());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
-
- hookManager = new DistributedQueryHookManager();
- hookManager.addHook(new CreateTableHook());
- hookManager.addHook(new InsertHook());
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
}
@@ -148,6 +126,14 @@ public class GlobalEngine extends AbstractService {
return annotatedPlanVerifier;
}
+ public DDLExecutor getDDLExecutor() {
+ return ddlExecutor;
+ }
+
+ public QueryExecutor getQueryExecutor() {
+ return queryExecutor;
+ }
+
private QueryContext createQueryContext(Session session) {
QueryContext newQueryContext = new QueryContext(context.getConf(), session);
@@ -168,12 +154,12 @@ public class GlobalEngine extends AbstractService {
if (isJson) {
planningContext = buildExpressionFromJson(query);
} else {
- planningContext = buildExpressionFromSql(queryContext, query);
+ planningContext = buildExpressionFromSql(query);
}
String jsonExpr = planningContext.toJson();
LogicalPlan plan = createLogicalPlan(queryContext, planningContext);
- SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, query, jsonExpr);
+ SubmitQueryResponse response = queryExecutor.execute(queryContext, session, query, jsonExpr, plan);
return response;
} catch (Throwable t) {
context.getSystemMetrics().counter("Query", "errorQuery").inc();
@@ -197,288 +183,14 @@ public class GlobalEngine extends AbstractService {
return JsonHelper.fromJson(json, Expr.class);
}
- public Expr buildExpressionFromSql(QueryContext queryContext, String sql)
- throws InterruptedException, IOException, IllegalQueryStatusException {
+ public Expr buildExpressionFromSql(String sql) throws InterruptedException, IOException,
+ IllegalQueryStatusException {
context.getSystemMetrics().counter("Query", "totalQuery").inc();
return analyzer.parse(sql);
}
- private SubmitQueryResponse executeQueryInternal(QueryContext queryContext,
- Session session,
- LogicalPlan plan,
- String sql,
- String jsonExpr) throws Exception {
-
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-
- SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
- responseBuilder.setIsForwarded(false);
- responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
-
- if (PlannerUtil.checkIfSetSession(rootNode)) {
-
- SetSessionNode setSessionNode = rootNode.getChild();
-
- final String varName = setSessionNode.getName();
-
- // SET CATALOG 'XXX'
- if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
- String databaseName = setSessionNode.getValue();
-
- if (catalog.existDatabase(databaseName)) {
- session.selectDatabase(setSessionNode.getValue());
- } else {
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
- responseBuilder.setErrorMessage("database \"" + databaseName + "\" does not exists.");
- return responseBuilder.build();
- }
-
- // others
- } else {
- if (setSessionNode.isDefaultValue()) {
- session.removeVariable(varName);
- } else {
- session.setVariable(varName, setSessionNode.getValue());
- }
- }
-
- context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
- } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
- context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
- updateQuery(queryContext, rootNode.getChild());
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
- } else if (plan.isExplain()) { // explain query
- String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
- Schema schema = new Schema();
- schema.addColumn("explain", TajoDataTypes.Type.TEXT);
- RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
-
- SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
-
- VTuple tuple = new VTuple(1);
- String[] lines = explainStr.split("\n");
- int bytesNum = 0;
- for (String line : lines) {
- tuple.put(0, DatumFactory.createText(line));
- byte [] encodedData = encoder.toBytes(tuple);
- bytesNum += encodedData.length;
- serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
- }
- serializedResBuilder.setSchema(schema.getProto());
- serializedResBuilder.setBytesNum(bytesNum);
-
- responseBuilder.setResultSet(serializedResBuilder.build());
- responseBuilder.setMaxRowNum(lines.length);
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-
- // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
- } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
- ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
- if (scanNode == null) {
- scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
- }
- TableDesc desc = scanNode.getTableDesc();
- int maxRow = Integer.MAX_VALUE;
- if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
- LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
- maxRow = (int) limitNode.getFetchFirstNum();
- }
- if (desc.getStats().getNumRows() == 0) {
- desc.getStats().setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
- }
-
- QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
-
- NonForwardQueryResultScanner queryResultScanner =
- new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
-
- queryResultScanner.init();
- session.addNonForwardQueryResultScanner(queryResultScanner);
-
- responseBuilder.setQueryId(queryId.getProto());
- responseBuilder.setMaxRowNum(maxRow);
- responseBuilder.setTableDesc(desc.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-
- // NonFromQuery indicates a form of 'select a, x+y;'
- } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
- Target[] targets = plan.getRootBlock().getRawTargets();
- if (targets == null) {
- throw new PlanningException("No targets");
- }
- final Tuple outTuple = new VTuple(targets.length);
- for (int i = 0; i < targets.length; i++) {
- EvalNode eval = targets[i].getEvalTree();
- outTuple.put(i, eval.eval(null, null));
- }
- boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
- if (isInsert) {
- InsertNode insertNode = rootNode.getChild();
- insertNonFromQuery(queryContext, insertNode, responseBuilder);
- } else {
- Schema schema = PlannerUtil.targetToSchema(targets);
- RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
- byte[] serializedBytes = encoder.toBytes(outTuple);
- SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
- serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
- serializedResBuilder.setSchema(schema.getProto());
- serializedResBuilder.setBytesNum(serializedBytes.length);
-
- responseBuilder.setResultSet(serializedResBuilder);
- responseBuilder.setMaxRowNum(1);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
- }
- } else { // it requires distributed execution. So, the query is forwarded to a query master.
- StoreType storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (!storageProperty.isSupportsInsertInto()) {
- throw new VerifyException("Inserting into non-file storage is not supported.");
- }
- sm.beforeInsertOrCATS(rootNode.getChild());
- }
- context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
- hookManager.doHooks(queryContext, plan);
-
- QueryJobManager queryJobManager = this.context.getQueryJobManager();
- QueryInfo queryInfo;
-
- queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
-
- if(queryInfo == null) {
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
- responseBuilder.setErrorMessage("Fail starting QueryMaster.");
- LOG.error("Fail starting QueryMaster: " + sql);
- } else {
- responseBuilder.setIsForwarded(true);
- responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
- if(queryInfo.getQueryMasterHost() != null) {
- responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
- }
- responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
- LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
- " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
- }
- }
-
- responseBuilder.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));
- SubmitQueryResponse response = responseBuilder.build();
- return response;
- }
-
- private void insertNonFromQuery(QueryContext queryContext, InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
- throws Exception {
- String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
- String queryId = nodeUniqName + "_" + System.currentTimeMillis();
-
- FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
- Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
- Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-
- TableDesc tableDesc = null;
- Path finalOutputDir = null;
- if (insertNode.getTableName() != null) {
- tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
- finalOutputDir = new Path(tableDesc.getPath());
- } else {
- finalOutputDir = insertNode.getPath();
- }
-
- TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
- taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
-
- EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
- StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
- try {
- exec.init();
- exec.next();
- } finally {
- exec.close();
- }
-
- if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
- // it moves the original table into the temporary location.
- // Then it moves the new result table into the original table location.
- // Upon failed, it recovers the original table if possible.
- boolean movedToOldTable = false;
- boolean committed = false;
- Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
- try {
- if (fs.exists(finalOutputDir)) {
- fs.rename(finalOutputDir, oldTableDir);
- movedToOldTable = fs.exists(oldTableDir);
- } else { // if the parent does not exist, make its parent directory.
- fs.mkdirs(finalOutputDir.getParent());
- }
- fs.rename(stagingResultDir, finalOutputDir);
- committed = fs.exists(finalOutputDir);
- } catch (IOException ioe) {
- // recover the old table
- if (movedToOldTable && !committed) {
- fs.rename(oldTableDir, finalOutputDir);
- }
- }
- } else {
- FileStatus[] files = fs.listStatus(stagingResultDir);
- for (FileStatus eachFile: files) {
- Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
- if (fs.exists(targetFilePath)) {
- targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
- }
- fs.rename(eachFile.getPath(), targetFilePath);
- }
- }
-
- if (insertNode.hasTargetTable()) {
- TableStats stats = tableDesc.getStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
-
- UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
- builder.setTableName(tableDesc.getName());
- builder.setStats(stats.getProto());
-
- catalog.updateTableStats(builder.build());
-
- responseBuilder.setTableDesc(tableDesc.getProto());
- } else {
- TableStats stats = new TableStats();
- long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
- stats.setNumBytes(volume);
- stats.setNumRows(1);
-
- // Empty TableDesc
- List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
- CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
- .setTableName(nodeUniqName)
- .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
- .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
- .setStats(stats.getProto())
- .build();
-
- responseBuilder.setTableDesc(tableDescProto);
- }
-
- // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
- responseBuilder.setMaxRowNum(-1);
- responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
- }
-
-
- public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
+ public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException,
+ SQLException, PlanningException {
try {
LOG.info("SQL: " + sql);
@@ -496,7 +208,7 @@ public class GlobalEngine extends AbstractService {
if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
throw new SQLException("This is not update query:\n" + sql);
} else {
- updateQuery(queryContext, rootNode.getChild());
+ ddlExecutor.execute(queryContext, plan);
return QueryIdFactory.NULL_QUERY_ID;
}
} catch (Exception e) {
@@ -505,44 +217,6 @@ public class GlobalEngine extends AbstractService {
}
}
- private boolean updateQuery(QueryContext queryContext, LogicalNode root) throws IOException {
-
- switch (root.getType()) {
- case SET_SESSION:
-
- case CREATE_DATABASE:
- CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
- createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
- return true;
- case DROP_DATABASE:
- DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
- dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
- return true;
- case CREATE_TABLE:
- CreateTableNode createTable = (CreateTableNode) root;
- createTable(queryContext, createTable, createTable.isIfNotExists());
- return true;
- case DROP_TABLE:
- DropTableNode dropTable = (DropTableNode) root;
- dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
- return true;
- case ALTER_TABLESPACE:
- AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
- alterTablespace(queryContext, alterTablespace);
- return true;
- case ALTER_TABLE:
- AlterTableNode alterTable = (AlterTableNode) root;
- alterTable(queryContext,alterTable);
- return true;
- case TRUNCATE_TABLE:
- TruncateTableNode truncateTable = (TruncateTableNode) root;
- truncateTable(queryContext, truncateTable);
- return true;
- default:
- throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
- }
- }
-
private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws PlanningException {
VerificationState state = new VerificationState();
@@ -599,403 +273,4 @@ public class GlobalEngine extends AbstractService {
}
}
}
-
- /**
- * Alter a given table
- */
- public void alterTablespace(final QueryContext queryContext, final AlterTablespaceNode alterTablespace) {
-
- final CatalogService catalog = context.getCatalog();
- final String spaceName = alterTablespace.getTablespaceName();
-
- AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
- builder.setSpaceName(spaceName);
- if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
- AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
- AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
- commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
- commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
- commandBuilder.build();
- builder.addCommand(commandBuilder);
- } else {
- throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
- }
-
- catalog.alterTablespace(builder.build());
- }
-
- /**
- * Alter a given table
- */
- public void alterTable(final QueryContext queryContext, final AlterTableNode alterTable) throws IOException {
-
- final CatalogService catalog = context.getCatalog();
- final String tableName = alterTable.getTableName();
-
- String databaseName;
- String simpleTableName;
- if (CatalogUtil.isFQTableName(tableName)) {
- String[] split = CatalogUtil.splitFQTableName(tableName);
- databaseName = split[0];
- simpleTableName = split[1];
- } else {
- databaseName = queryContext.getCurrentDatabase();
- simpleTableName = tableName;
- }
- final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
- if (!catalog.existsTable(databaseName, simpleTableName)) {
- throw new NoSuchTableException(qualifiedName);
- }
-
- switch (alterTable.getAlterTableOpType()) {
- case RENAME_TABLE:
- if (!catalog.existsTable(databaseName, simpleTableName)) {
- throw new NoSuchTableException(alterTable.getTableName());
- }
- if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
- throw new AlreadyExistsTableException(alterTable.getNewTableName());
- }
-
- TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
-
- if (!desc.isExternal()) { // if the table is the managed table
- Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
- databaseName, simpleTableName);
- Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
- databaseName, alterTable.getNewTableName());
- FileSystem fs = oldPath.getFileSystem(context.getConf());
-
- if (!fs.exists(oldPath)) {
- throw new IOException("No such a table directory: " + oldPath);
- }
- if (fs.exists(newPath)) {
- throw new IOException("Already table directory exists: " + newPath);
- }
-
- fs.rename(oldPath, newPath);
- }
- catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
- AlterTableType.RENAME_TABLE));
- break;
- case RENAME_COLUMN:
- if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
- throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
- }
- catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(),
- alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
- break;
- case ADD_COLUMN:
- if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
- throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
- }
- catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
- break;
- default:
- //TODO
- }
- }
-
- /**
- * Truncate table a given table
- */
- public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode)
- throws IOException {
- List<String> tableNames = truncateTableNode.getTableNames();
- final CatalogService catalog = context.getCatalog();
-
- String databaseName;
- String simpleTableName;
-
- List<TableDesc> tableDescList = new ArrayList<TableDesc>();
- for (String eachTableName: tableNames) {
- if (CatalogUtil.isFQTableName(eachTableName)) {
- String[] split = CatalogUtil.splitFQTableName(eachTableName);
- databaseName = split[0];
- simpleTableName = split[1];
- } else {
- databaseName = queryContext.getCurrentDatabase();
- simpleTableName = eachTableName;
- }
- final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
- if (!catalog.existsTable(databaseName, simpleTableName)) {
- throw new NoSuchTableException(qualifiedName);
- }
-
- Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
- TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
- Path tablePath = new Path(tableDesc.getPath());
- if (tablePath.getParent() == null ||
- !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
- throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
- ", warehouse dir=" + warehousePath);
- }
- tableDescList.add(tableDesc);
- }
-
- for (TableDesc eachTable: tableDescList) {
- Path path = new Path(eachTable.getPath());
- LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
- FileSystem fs = path.getFileSystem(context.getConf());
-
- FileStatus[] files = fs.listStatus(path);
- if (files != null) {
- for (FileStatus eachFile: files) {
- fs.delete(eachFile.getPath(), true);
- }
- }
- }
- }
-
- private boolean existColumnName(String tableName, String columnName) {
- final TableDesc tableDesc = catalog.getTableDesc(tableName);
- return tableDesc.getSchema().containsByName(columnName) ? true : false;
- }
-
- private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists) throws IOException {
- TableMeta meta;
-
- if (createTable.hasOptions()) {
- meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
- } else {
- meta = CatalogUtil.newTableMeta(createTable.getStorageType());
- }
-
- if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
- Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
- }
-
- return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
- createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
- createTable.getPartitionMethod(), ifNotExists);
- }
-
- public TableDesc createTable(QueryContext queryContext, String tableName, StoreType storeType,
- Schema schema, TableMeta meta, Path path, boolean isExternal,
- PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
- String databaseName;
- String simpleTableName;
- if (CatalogUtil.isFQTableName(tableName)) {
- String [] splitted = CatalogUtil.splitFQTableName(tableName);
- databaseName = splitted[0];
- simpleTableName = splitted[1];
- } else {
- databaseName = queryContext.getCurrentDatabase();
- simpleTableName = tableName;
- }
- String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
- boolean exists = catalog.existsTable(databaseName, simpleTableName);
-
- if (exists) {
- if (ifNotExists) {
- LOG.info("relation \"" + qualifiedName + "\" is already exists." );
- return catalog.getTableDesc(databaseName, simpleTableName);
- } else {
- throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
- }
- }
-
- TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
- schema, meta, (path != null ? path.toUri(): null), isExternal);
-
- if (partitionDesc != null) {
- desc.setPartitionMethod(partitionDesc);
- }
-
- StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
-
- if (catalog.createTable(desc)) {
- LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
- return desc;
- } else {
- LOG.info("Table creation " + tableName + " is failed.");
- throw new CatalogException("Cannot create table \"" + tableName + "\".");
- }
- }
-
- public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName,
- @Nullable String tablespace,
- boolean ifNotExists) throws IOException {
-
- String tablespaceName;
- if (tablespace == null) {
- tablespaceName = DEFAULT_TABLESPACE_NAME;
- } else {
- tablespaceName = tablespace;
- }
-
- // CREATE DATABASE IF NOT EXISTS
- boolean exists = catalog.existDatabase(databaseName);
- if (exists) {
- if (ifNotExists) {
- LOG.info("database \"" + databaseName + "\" is already exists." );
- return true;
- } else {
- throw new AlreadyExistsDatabaseException(databaseName);
- }
- }
-
- if (catalog.createDatabase(databaseName, tablespaceName)) {
- String normalized = databaseName;
- Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
- FileSystem fs = databaseDir.getFileSystem(context.getConf());
- fs.mkdirs(databaseDir);
- }
-
- return true;
- }
-
- public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) {
-
- boolean exists = catalog.existDatabase(databaseName);
- if(!exists) {
- if (ifExists) { // DROP DATABASE IF EXISTS
- LOG.info("database \"" + databaseName + "\" does not exists." );
- return true;
- } else { // Otherwise, it causes an exception.
- throw new NoSuchDatabaseException(databaseName);
- }
- }
-
- if (queryContext.getCurrentDatabase().equals(databaseName)) {
- throw new RuntimeException("ERROR: Cannot drop the current open database");
- }
-
- boolean result = catalog.dropDatabase(databaseName);
- LOG.info("database " + databaseName + " is dropped.");
- return result;
- }
-
- /**
- * Drop a given named table
- *
- * @param tableName to be dropped
- * @param purge Remove all data if purge is true.
- */
- public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) {
- CatalogService catalog = context.getCatalog();
-
- String databaseName;
- String simpleTableName;
- if (CatalogUtil.isFQTableName(tableName)) {
- String [] splitted = CatalogUtil.splitFQTableName(tableName);
- databaseName = splitted[0];
- simpleTableName = splitted[1];
- } else {
- databaseName = queryContext.getCurrentDatabase();
- simpleTableName = tableName;
- }
- String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
-
- boolean exists = catalog.existsTable(qualifiedName);
- if(!exists) {
- if (ifExists) { // DROP TABLE IF EXISTS
- LOG.info("relation \"" + qualifiedName + "\" is already exists." );
- return true;
- } else { // Otherwise, it causes an exception.
- throw new NoSuchTableException(qualifiedName);
- }
- }
-
- TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
- catalog.dropTable(qualifiedName);
-
- if (purge) {
- try {
- StorageManager.getStorageManager(queryContext.getConf(),
- tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
- } catch (IOException e) {
- throw new InternalError(e.getMessage());
- }
- }
-
- LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
- return true;
- }
-
- public interface DistributedQueryHook {
- boolean isEligible(QueryContext queryContext, LogicalPlan plan);
- void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
- }
-
- public static class DistributedQueryHookManager {
- private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
- public void addHook(DistributedQueryHook hook) {
- hooks.add(hook);
- }
-
- public void doHooks(QueryContext queryContext, LogicalPlan plan) {
- for (DistributedQueryHook hook : hooks) {
- if (hook.isEligible(queryContext, plan)) {
- try {
- hook.hook(queryContext, plan);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- }
- }
- }
-
- public class CreateTableHook implements DistributedQueryHook {
-
- @Override
- public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
- }
-
- @Override
- public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- CreateTableNode createTableNode = rootNode.getChild();
- String [] splitted = CatalogUtil.splitFQTableName(createTableNode.getTableName());
- String databaseName = splitted[0];
- String tableName = splitted[1];
- queryContext.setOutputTable(tableName);
- queryContext.setOutputPath(
- StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName));
- if(createTableNode.getPartitionMethod() != null) {
- queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
- }
- queryContext.setCreateTable();
- }
- }
-
- public static class InsertHook implements DistributedQueryHook {
-
- @Override
- public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
- return plan.getRootBlock().getRootType() == NodeType.INSERT;
- }
-
- @Override
- public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
- queryContext.setInsert();
-
- InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
-
- // Set QueryContext settings, such as output table name and output path.
- // It also remove data files if overwrite is true.
- Path outputPath;
- if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
- queryContext.setOutputTable(insertNode.getTableName());
- queryContext.setOutputPath(insertNode.getPath());
- if (insertNode.hasPartition()) {
- queryContext.setPartitionMethod(insertNode.getPartitionMethod());
- }
- } else { // INSERT INTO LOCATION ...
- // When INSERT INTO LOCATION, must not set output table.
- outputPath = insertNode.getPath();
- queryContext.setFileOutput();
- queryContext.setOutputPath(outputPath);
- }
-
- if (insertNode.isOverwrite()) {
- queryContext.setOutputOverwrite();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index f307127..d021e43 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -371,7 +371,7 @@ public class TajoMaster extends CompositeService {
}
if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
- globalEngine.createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
+ globalEngine.getDDLExecutor().createDatabase(null, DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME, false);
} else {
LOG.info(String.format("Default database (%s) is already prepared.", DEFAULT_DATABASE_NAME));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7014034..ee99353 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -658,7 +658,7 @@ public class TajoMasterClientService extends AbstractService {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
- if (context.getGlobalEngine().createDatabase(queryContext, request.getValue(), null, false)) {
+ if (context.getGlobalEngine().getDDLExecutor().createDatabase(queryContext, request.getValue(), null, false)) {
return BOOL_TRUE;
} else {
return BOOL_FALSE;
@@ -688,7 +688,7 @@ public class TajoMasterClientService extends AbstractService {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
- if (context.getGlobalEngine().dropDatabase(queryContext, request.getValue(), false)) {
+ if (context.getGlobalEngine().getDDLExecutor().dropDatabase(queryContext, request.getValue(), false)) {
return BOOL_TRUE;
} else {
return BOOL_FALSE;
@@ -814,7 +814,7 @@ public class TajoMasterClientService extends AbstractService {
TableDesc desc;
try {
- desc = context.getGlobalEngine().createTable(queryContext, request.getName(),
+ desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
meta.getStoreType(), schema,
meta, path, true, partitionDesc, false);
} catch (Exception e) {
@@ -843,7 +843,8 @@ public class TajoMasterClientService extends AbstractService {
Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
- context.getGlobalEngine().dropTable(queryContext, dropTable.getName(), false, dropTable.getPurge());
+ context.getGlobalEngine().getDDLExecutor().dropTable(queryContext, dropTable.getName(), false,
+ dropTable.getPurge());
return BOOL_TRUE;
} catch (Throwable t) {
throw new ServiceException(t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
new file mode 100644
index 0000000..acbaa01
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.AlterTablespaceSetType;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+/**
+ * Executor for DDL statements. They are executed on only TajoMaster.
+ */
+public class DDLExecutor {
+ private static final Log LOG = LogFactory.getLog(DDLExecutor.class);
+
+ private final TajoMaster.MasterContext context;
+ private final CatalogService catalog;
+ private final StorageManager storageManager;
+
+ public DDLExecutor(TajoMaster.MasterContext context) {
+ this.context = context;
+ this.catalog = context.getCatalog();
+ this.storageManager = context.getStorageManager();
+ }
+
+ public boolean execute(QueryContext queryContext, LogicalPlan plan) throws IOException {
+ LogicalNode root = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();
+
+ switch (root.getType()) {
+
+ case ALTER_TABLESPACE:
+ AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+ alterTablespace(context, queryContext, alterTablespace);
+ return true;
+
+
+ case CREATE_DATABASE:
+ CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+ createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+ return true;
+ case DROP_DATABASE:
+ DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+ dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+ return true;
+
+
+ case CREATE_TABLE:
+ CreateTableNode createTable = (CreateTableNode) root;
+ createTable(queryContext, createTable, createTable.isIfNotExists());
+ return true;
+ case DROP_TABLE:
+ DropTableNode dropTable = (DropTableNode) root;
+ dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+ return true;
+ case TRUNCATE_TABLE:
+ TruncateTableNode truncateTable = (TruncateTableNode) root;
+ truncateTable(queryContext, truncateTable);
+ return true;
+
+ case ALTER_TABLE:
+ AlterTableNode alterTable = (AlterTableNode) root;
+ alterTable(context, queryContext, alterTable);
+ return true;
+
+ default:
+ throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
+ }
+ }
+
+ /**
+ * Alter a given table
+ */
+ public static void alterTablespace(final TajoMaster.MasterContext context, final QueryContext queryContext,
+ final AlterTablespaceNode alterTablespace) {
+
+ final CatalogService catalog = context.getCatalog();
+ final String spaceName = alterTablespace.getTablespaceName();
+
+ AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
+ builder.setSpaceName(spaceName);
+ if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
+ AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder =
+ AlterTablespaceProto.AlterTablespaceCommand.newBuilder();
+ commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
+ commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
+ commandBuilder.build();
+ builder.addCommand(commandBuilder);
+ } else {
+ throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
+ }
+
+ catalog.alterTablespace(builder.build());
+ }
+
+ //--------------------------------------------------------------------------
+
+ // Database Section
+ //--------------------------------------------------------------------------
+ public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName,
+ @Nullable String tablespace,
+ boolean ifNotExists) throws IOException {
+
+ String tablespaceName;
+ if (tablespace == null) {
+ tablespaceName = DEFAULT_TABLESPACE_NAME;
+ } else {
+ tablespaceName = tablespace;
+ }
+
+ // CREATE DATABASE IF NOT EXISTS
+ boolean exists = catalog.existDatabase(databaseName);
+ if (exists) {
+ if (ifNotExists) {
+ LOG.info("database \"" + databaseName + "\" is already exists." );
+ return true;
+ } else {
+ throw new AlreadyExistsDatabaseException(databaseName);
+ }
+ }
+
+ if (catalog.createDatabase(databaseName, tablespaceName)) {
+ String normalized = databaseName;
+ Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
+ FileSystem fs = databaseDir.getFileSystem(context.getConf());
+ fs.mkdirs(databaseDir);
+ }
+
+ return true;
+ }
+
+ public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) {
+ boolean exists = catalog.existDatabase(databaseName);
+ if(!exists) {
+ if (ifExists) { // DROP DATABASE IF EXISTS
+ LOG.info("database \"" + databaseName + "\" does not exists." );
+ return true;
+ } else { // Otherwise, it causes an exception.
+ throw new NoSuchDatabaseException(databaseName);
+ }
+ }
+
+ if (queryContext.getCurrentDatabase().equals(databaseName)) {
+ throw new RuntimeException("ERROR: Cannot drop the current open database");
+ }
+
+ boolean result = catalog.dropDatabase(databaseName);
+ LOG.info("database " + databaseName + " is dropped.");
+ return result;
+ }
+
+ //--------------------------------------------------------------------------
+
+ // Table Section
+ //--------------------------------------------------------------------------
+ private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists)
+ throws IOException {
+ TableMeta meta;
+
+ if (createTable.hasOptions()) {
+ meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
+ } else {
+ meta = CatalogUtil.newTableMeta(createTable.getStorageType());
+ }
+
+ if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
+ Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+ }
+
+ return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
+ createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
+ createTable.getPartitionMethod(), ifNotExists);
+ }
+
+ public TableDesc createTable(QueryContext queryContext, String tableName, CatalogProtos.StoreType storeType,
+ Schema schema, TableMeta meta, Path path, boolean isExternal,
+ PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String [] splitted = CatalogUtil.splitFQTableName(tableName);
+ databaseName = splitted[0];
+ simpleTableName = splitted[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ boolean exists = catalog.existsTable(databaseName, simpleTableName);
+
+ if (exists) {
+ if (ifNotExists) {
+ LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+ return catalog.getTableDesc(databaseName, simpleTableName);
+ } else {
+ throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+ }
+ }
+
+ TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
+ schema, meta, (path != null ? path.toUri(): null), isExternal);
+
+ if (partitionDesc != null) {
+ desc.setPartitionMethod(partitionDesc);
+ }
+
+ StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+
+ if (catalog.createTable(desc)) {
+ LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+ return desc;
+ } else {
+ LOG.info("Table creation " + tableName + " is failed.");
+ throw new CatalogException("Cannot create table \"" + tableName + "\".");
+ }
+ }
+
+ /**
+ * Drop a given named table
+ *
+ * @param tableName to be dropped
+ * @param purge Remove all data if purge is true.
+ */
+ public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) {
+ CatalogService catalog = context.getCatalog();
+
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String [] splitted = CatalogUtil.splitFQTableName(tableName);
+ databaseName = splitted[0];
+ simpleTableName = splitted[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ boolean exists = catalog.existsTable(qualifiedName);
+ if(!exists) {
+ if (ifExists) { // DROP TABLE IF EXISTS
+ LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+ return true;
+ } else { // Otherwise, it causes an exception.
+ throw new NoSuchTableException(qualifiedName);
+ }
+ }
+
+ TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
+ catalog.dropTable(qualifiedName);
+
+ if (purge) {
+ try {
+ StorageManager.getStorageManager(queryContext.getConf(),
+ tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
+ } catch (IOException e) {
+ throw new InternalError(e.getMessage());
+ }
+ }
+
+ LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
+ return true;
+ }
+
+ /**
+ * Truncate table a given table
+ */
+ public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode)
+ throws IOException {
+ List<String> tableNames = truncateTableNode.getTableNames();
+ final CatalogService catalog = context.getCatalog();
+
+ String databaseName;
+ String simpleTableName;
+
+ List<TableDesc> tableDescList = new ArrayList<TableDesc>();
+ for (String eachTableName: tableNames) {
+ if (CatalogUtil.isFQTableName(eachTableName)) {
+ String[] split = CatalogUtil.splitFQTableName(eachTableName);
+ databaseName = split[0];
+ simpleTableName = split[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleTableName = eachTableName;
+ }
+ final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new NoSuchTableException(qualifiedName);
+ }
+
+ Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
+ TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
+ Path tablePath = new Path(tableDesc.getPath());
+ if (tablePath.getParent() == null ||
+ !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
+ throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
+ ", warehouse dir=" + warehousePath);
+ }
+ tableDescList.add(tableDesc);
+ }
+
+ for (TableDesc eachTable: tableDescList) {
+ Path path = new Path(eachTable.getPath());
+ LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
+ FileSystem fs = path.getFileSystem(context.getConf());
+
+ FileStatus[] files = fs.listStatus(path);
+ if (files != null) {
+ for (FileStatus eachFile: files) {
+ fs.delete(eachFile.getPath(), true);
+ }
+ }
+ }
+ }
+
+ /**
+ * ALTER TABLE SET ...
+ */
+ public void alterTable(TajoMaster.MasterContext context, final QueryContext queryContext,
+ final AlterTableNode alterTable) throws IOException {
+
+ final CatalogService catalog = context.getCatalog();
+ final String tableName = alterTable.getTableName();
+
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String[] split = CatalogUtil.splitFQTableName(tableName);
+ databaseName = split[0];
+ simpleTableName = split[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new NoSuchTableException(qualifiedName);
+ }
+
+ switch (alterTable.getAlterTableOpType()) {
+ case RENAME_TABLE:
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new NoSuchTableException(alterTable.getTableName());
+ }
+ if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
+ throw new AlreadyExistsTableException(alterTable.getNewTableName());
+ }
+
+ TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
+
+ if (!desc.isExternal()) { // if the table is the managed table
+ Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+ databaseName, simpleTableName);
+ Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+ databaseName, alterTable.getNewTableName());
+ FileSystem fs = oldPath.getFileSystem(context.getConf());
+
+ if (!fs.exists(oldPath)) {
+ throw new IOException("No such a table directory: " + oldPath);
+ }
+ if (fs.exists(newPath)) {
+ throw new IOException("Already table directory exists: " + newPath);
+ }
+
+ fs.rename(oldPath, newPath);
+ }
+ catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
+ AlterTableType.RENAME_TABLE));
+ break;
+ case RENAME_COLUMN:
+ if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
+ throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
+ }
+ catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(),
+ alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
+ break;
+ case ADD_COLUMN:
+ if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
+ throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
+ }
+ catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
+ break;
+ default:
+ //TODO
+ }
+ }
+
+ private boolean existColumnName(String tableName, String columnName) {
+ final TableDesc tableDesc = catalog.getTableDesc(tableName);
+ return tableDesc.getSchema().containsByName(columnName) ? true : false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
new file mode 100644
index 0000000..3585ae7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.StoreTableExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.master.NonForwardQueryResultScanner;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.exec.prehook.CreateTableHook;
+import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
+import org.apache.tajo.master.exec.prehook.InsertIntoHook;
+import org.apache.tajo.master.querymaster.*;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class QueryExecutor {
+ private static final Log LOG = LogFactory.getLog(QueryExecutor.class);
+
+ private final TajoMaster.MasterContext context;
+ private final CatalogService catalog;
+ private final DistributedQueryHookManager hookManager;
+ private final DDLExecutor ddlExecutor;
+
+ public QueryExecutor(TajoMaster.MasterContext context, DDLExecutor ddlExecutor) {
+ this.context = context;
+ this.catalog = context.getCatalog();
+
+ this.ddlExecutor = ddlExecutor;
+ this.hookManager = new DistributedQueryHookManager();
+ this.hookManager.addHook(new CreateTableHook());
+ this.hookManager.addHook(new InsertIntoHook());
+ }
+
+ public SubmitQueryResponse execute(QueryContext queryContext, Session session, String sql, String jsonExpr,
+ LogicalPlan plan) throws Exception {
+
+ SubmitQueryResponse.Builder response = SubmitQueryResponse.newBuilder();
+ response.setIsForwarded(false);
+ response.setUserName(queryContext.get(SessionVars.USERNAME));
+
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ if (PlannerUtil.checkIfSetSession(rootNode)) {
+ execSetSession(session, plan, response);
+
+
+ } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+ context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+ ddlExecutor.execute(queryContext, plan);
+ response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ response.setResultCode(ClientProtos.ResultCode.OK);
+
+
+ } else if (plan.isExplain()) { // explain query
+ execExplain(plan, response);
+
+
+ // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
+ } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
+ execSimpleQuery(queryContext, session, sql, plan, response);
+
+
+ // NonFromQuery indicates a form of 'select a, x+y;'
+ } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
+ execNonFromQuery(queryContext, session, sql, plan, response);
+
+
+ } else { // it requires distributed execution. So, the query is forwarded to a query master.
+ executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+ }
+
+ response.setSessionVars(ProtoUtil.convertFromMap(session.getAllVariables()));
+
+ return response.build();
+ }
+
+ public void execSetSession(Session session, LogicalPlan plan,
+ SubmitQueryResponse.Builder response) {
+ SetSessionNode setSessionNode = ((LogicalRootNode)plan.getRootBlock().getRoot()).getChild();
+
+ final String varName = setSessionNode.getName();
+
+ // SET CATALOG 'XXX'
+ if (varName.equals(SessionVars.CURRENT_DATABASE.name())) {
+ String databaseName = setSessionNode.getValue();
+
+ if (catalog.existDatabase(databaseName)) {
+ session.selectDatabase(setSessionNode.getValue());
+ } else {
+ response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ response.setResultCode(ClientProtos.ResultCode.ERROR);
+ response.setErrorMessage("database \"" + databaseName + "\" does not exists.");
+ }
+
+ // others
+ } else {
+ if (setSessionNode.isDefaultValue()) {
+ session.removeVariable(varName);
+ } else {
+ session.setVariable(varName, setSessionNode.getValue());
+ }
+ }
+
+ context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+ response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ response.setResultCode(ClientProtos.ResultCode.OK);
+ }
+
+ public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
+
+ String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+ Schema schema = new Schema();
+ schema.addColumn("explain", TajoDataTypes.Type.TEXT);
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+
+ ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+
+ VTuple tuple = new VTuple(1);
+ String[] lines = explainStr.split("\n");
+ int bytesNum = 0;
+ for (String line : lines) {
+ tuple.put(0, DatumFactory.createText(line));
+ byte [] encodedData = encoder.toBytes(tuple);
+ bytesNum += encodedData.length;
+ serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
+ }
+ serializedResBuilder.setSchema(schema.getProto());
+ serializedResBuilder.setBytesNum(bytesNum);
+
+ response.setResultSet(serializedResBuilder.build());
+ response.setMaxRowNum(lines.length);
+ response.setResultCode(ClientProtos.ResultCode.OK);
+ response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ }
+
+ public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
+ SubmitQueryResponse.Builder response) throws Exception {
+ ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+ if (scanNode == null) {
+ scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+ }
+ TableDesc desc = scanNode.getTableDesc();
+ int maxRow = Integer.MAX_VALUE;
+ if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+ LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+ maxRow = (int) limitNode.getFetchFirstNum();
+ }
+ if (desc.getStats().getNumRows() == 0) {
+ desc.getStats().setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+ }
+
+ QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+ NonForwardQueryResultScanner queryResultScanner =
+ new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+
+ queryResultScanner.init();
+ session.addNonForwardQueryResultScanner(queryResultScanner);
+
+ response.setQueryId(queryId.getProto());
+ response.setMaxRowNum(maxRow);
+ response.setTableDesc(desc.getProto());
+ response.setResultCode(ClientProtos.ResultCode.OK);
+ }
+
+ public void execNonFromQuery(QueryContext queryContext, Session session, String query,
+ LogicalPlan plan, SubmitQueryResponse.Builder responseBuilder) throws Exception {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ Target[] targets = plan.getRootBlock().getRawTargets();
+ if (targets == null) {
+ throw new PlanningException("No targets");
+ }
+ final Tuple outTuple = new VTuple(targets.length);
+ for (int i = 0; i < targets.length; i++) {
+ EvalNode eval = targets[i].getEvalTree();
+ outTuple.put(i, eval.eval(null, null));
+ }
+ boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
+ if (isInsert) {
+ InsertNode insertNode = rootNode.getChild();
+ insertNonFromQuery(queryContext, insertNode, responseBuilder);
+ } else {
+ Schema schema = PlannerUtil.targetToSchema(targets);
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+ byte[] serializedBytes = encoder.toBytes(outTuple);
+ ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder();
+ serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+ serializedResBuilder.setSchema(schema.getProto());
+ serializedResBuilder.setBytesNum(serializedBytes.length);
+
+ responseBuilder.setResultSet(serializedResBuilder);
+ responseBuilder.setMaxRowNum(1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ }
+ }
+
+ private void insertNonFromQuery(QueryContext queryContext,
+ InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
+ throws Exception {
+ String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
+ String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+ FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+ Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+ TableDesc tableDesc = null;
+ Path finalOutputDir = null;
+ if (insertNode.getTableName() != null) {
+ tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+ finalOutputDir = new Path(tableDesc.getPath());
+ } else {
+ finalOutputDir = insertNode.getPath();
+ }
+
+ TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+ taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+
+ EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+ StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+ try {
+ exec.init();
+ exec.next();
+ } finally {
+ exec.close();
+ }
+
+ if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+ // it moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ try {
+ if (fs.exists(finalOutputDir)) {
+ fs.rename(finalOutputDir, oldTableDir);
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputDir.getParent());
+ }
+ fs.rename(stagingResultDir, finalOutputDir);
+ committed = fs.exists(finalOutputDir);
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+ fs.rename(oldTableDir, finalOutputDir);
+ }
+ }
+ } else {
+ FileStatus[] files = fs.listStatus(stagingResultDir);
+ for (FileStatus eachFile: files) {
+ Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+ if (fs.exists(targetFilePath)) {
+ targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+ }
+ fs.rename(eachFile.getPath(), targetFilePath);
+ }
+ }
+
+ if (insertNode.hasTargetTable()) {
+ TableStats stats = tableDesc.getStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
+
+ CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+ builder.setTableName(tableDesc.getName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
+
+ responseBuilder.setTableDesc(tableDesc.getProto());
+ } else {
+ TableStats stats = new TableStats();
+ long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+ stats.setNumBytes(volume);
+ stats.setNumRows(1);
+
+ // Empty TableDesc
+ List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+ CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+ .setTableName(nodeUniqName)
+ .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build())
+ .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+ .setStats(stats.getProto())
+ .build();
+
+ responseBuilder.setTableDesc(tableDescProto);
+ }
+
+ // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+ responseBuilder.setMaxRowNum(-1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ }
+
+ public void executeDistributedQuery(QueryContext queryContext, Session session,
+ LogicalPlan plan,
+ String sql,
+ String jsonExpr,
+ SubmitQueryResponse.Builder responseBuilder) throws Exception {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ CatalogProtos.StoreType storeType = PlannerUtil.getStoreType(plan);
+ if (storeType != null) {
+ StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
+ StorageProperty storageProperty = sm.getStorageProperty();
+ if (!storageProperty.isSupportsInsertInto()) {
+ throw new VerifyException("Inserting into non-file storage is not supported.");
+ }
+ sm.beforeInsertOrCATS(rootNode.getChild());
+ }
+ context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
+ hookManager.doHooks(queryContext, plan);
+
+ QueryJobManager queryJobManager = this.context.getQueryJobManager();
+ QueryInfo queryInfo;
+
+ queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+
+ if(queryInfo == null) {
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+ responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+ LOG.error("Fail starting QueryMaster: " + sql);
+ } else {
+ responseBuilder.setIsForwarded(true);
+ responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ if(queryInfo.getQueryMasterHost() != null) {
+ responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ }
+ responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+ LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
+ " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
new file mode 100644
index 0000000..5e3d0b6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.CreateTableNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.storage.StorageUtil;
+
+public class CreateTableHook implements DistributedQueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
+ }
+
+ @Override
+ public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ CreateTableNode createTableNode = rootNode.getChild();
+ String [] splitted = CatalogUtil.splitFQTableName(createTableNode.getTableName());
+ String databaseName = splitted[0];
+ String tableName = splitted[1];
+ queryContext.setOutputTable(tableName);
+ queryContext.setOutputPath(
+ StorageUtil.concatPath(TajoConf.getWarehouseDir(queryContext.getConf()), databaseName, tableName));
+ if(createTableNode.getPartitionMethod() != null) {
+ queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
+ }
+ queryContext.setCreateTable();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
new file mode 100644
index 0000000..77c6ff4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHook.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+
+public interface DistributedQueryHook {
+ boolean isEligible(QueryContext queryContext, LogicalPlan plan);
+ void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
new file mode 100644
index 0000000..3dba176
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DistributedQueryHookManager {
+ private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
+
+ public void addHook(DistributedQueryHook hook) {
+ hooks.add(hook);
+ }
+
+ public void doHooks(QueryContext queryContext, LogicalPlan plan) {
+ for (DistributedQueryHook hook : hooks) {
+ if (hook.isEligible(queryContext, plan)) {
+ try {
+ hook.hook(queryContext, plan);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a4c34842/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
new file mode 100644
index 0000000..14c4d8d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec.prehook;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.NodeType;
+
+public class InsertIntoHook implements DistributedQueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+ return plan.getRootBlock().getRootType() == NodeType.INSERT;
+ }
+
+ @Override
+ public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+ queryContext.setInsert();
+
+ InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
+
+ // Set QueryContext settings, such as output table name and output path.
+ // It also remove data files if overwrite is true.
+ Path outputPath;
+ if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
+ queryContext.setOutputTable(insertNode.getTableName());
+ queryContext.setOutputPath(insertNode.getPath());
+ if (insertNode.hasPartition()) {
+ queryContext.setPartitionMethod(insertNode.getPartitionMethod());
+ }
+ } else { // INSERT INTO LOCATION ...
+ // When INSERT INTO LOCATION, must not set output table.
+ outputPath = insertNode.getPath();
+ queryContext.setFileOutput();
+ queryContext.setOutputPath(outputPath);
+ }
+
+ if (insertNode.isOverwrite()) {
+ queryContext.setOutputOverwrite();
+ }
+
+ }
+}