You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/04 17:51:14 UTC
[2/3] TAJO-144: Implement INSERT OVERWRITE clause. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
new file mode 100644
index 0000000..6655317
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMeta.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
+
+public class QueryMeta extends Options {
+
+ public QueryMeta() {}
+
+ public QueryMeta(KeyValueSetProto proto) {
+ super(proto);
+ }
+
+ public void put(TajoConf.ConfVars key, String value) {
+ put(key.varname, value);
+ }
+
+ public String get(TajoConf.ConfVars key) {
+ return get(key.varname);
+ }
+
+ public String get(String key) {
+ return super.get(key);
+ }
+
+ public void setUser(String username) {
+ put(TajoConf.ConfVars.QUERY_USERNAME, username);
+ }
+
+ public String getUser() {
+ return get(TajoConf.ConfVars.QUERY_USERNAME);
+ }
+
+ public void setOutputTable(String tableName) {
+ put(TajoConf.ConfVars.QUERY_OUTPUT_TABLE, PlannerUtil.normalizeTableName(tableName));
+ }
+
+ public String getOutputTable() {
+ return get(TajoConf.ConfVars.QUERY_OUTPUT_TABLE);
+ }
+
+ public void setOutputPath(Path path) {
+ // it is determined in QueryMaster.initStagingDir().
+ put(TajoConf.ConfVars.QUERY_OUTPUT_DIR, path.toUri().toString());
+ }
+
+ public Path getOutputPath() {
+ return new Path(get(TajoConf.ConfVars.QUERY_OUTPUT_DIR));
+ }
+
+ public void setOutputOverwrite() {
+ put("tajo.output.overwrite", "1");
+ }
+
+ public boolean isOutputOverwrite() {
+ String overwrite = get("tajo.output.overwrite");
+ if (overwrite != null && overwrite.equals("1")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void setFileOutput() {
+ put("tajo.output.fileoutput", "1");
+ }
+
+ public boolean isFileOutput() {
+ String fileoutput = get("tajo.output.fileoutput");
+ if (fileoutput != null && fileoutput.equals("1")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void setCreateTable() {
+ put("tajo.query.command", CreateTableNode.class.getSimpleName());
+ }
+
+ public boolean isCreateTable() {
+ String command = get("tajo.query.command");
+ return command != null && command.equals(CreateTableNode.class.getSimpleName());
+ }
+
+ public void setInsert() {
+ put("tajo.query.command", InsertNode.class.getSimpleName());
+ }
+
+ public boolean isInsert() {
+ String command = get("tajo.query.command");
+ return command != null && command.equals(InsertNode.class.getSimpleName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 9b454f6..b26d05b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -118,7 +118,6 @@ public class TaskSchedulerImpl extends AbstractService
schedule();
}
- //req.getCallback().run(stopTaskRunnerReq);
LOG.info("TaskScheduler schedulingThread stopped");
}
};
@@ -459,7 +458,8 @@ public class TaskSchedulerImpl extends AbstractService
new ArrayList<Fragment>(task.getAllFragments()),
task.getOutputName(),
false,
- task.getLogicalPlan().toJson());
+ task.getLogicalPlan().toJson(),
+ context.getQueryMeta());
if (task.getStoreTableNode().isLocal()) {
taskAssign.setInterQuery();
}
@@ -503,7 +503,8 @@ public class TaskSchedulerImpl extends AbstractService
Lists.newArrayList(task.getAllFragments()),
task.getOutputName(),
false,
- task.getLogicalPlan().toJson());
+ task.getLogicalPlan().toJson(),
+ context.getQueryMeta());
if (task.getStoreTableNode().isLocal()) {
taskAssign.setInterQuery();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 6ae8ff7..54cb0fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -20,18 +20,24 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.QueryId;
+import org.apache.tajo.master.QueryMeta;
+/**
+ * This event is conveyed to QueryMaster.
+ */
public class QueryStartEvent extends AbstractEvent {
public enum EventType {
QUERY_START
}
private QueryId queryId;
+ private QueryMeta queryMeta;
private String logicalPlanJson;
- public QueryStartEvent(QueryId queryId, String logicalPlanJson) {
+ public QueryStartEvent(QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
super(EventType.QUERY_START);
this.queryId = queryId;
+ this.queryMeta = queryMeta;
this.logicalPlanJson = logicalPlanJson;
}
@@ -39,6 +45,10 @@ public class QueryStartEvent extends AbstractEvent {
return queryId;
}
+ public QueryMeta getQueryMeta() {
+ return this.queryMeta;
+ }
+
public String getLogicalPlanJson() {
return logicalPlanJson;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 99b7c62..d1e92be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -29,9 +29,13 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableDescImpl;
+import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.master.ExecutionBlock;
import org.apache.tajo.master.ExecutionBlockCursor;
import org.apache.tajo.master.event.*;
@@ -57,7 +61,7 @@ public class Query implements EventHandler<QueryEvent> {
private final EventHandler eventHandler;
private final MasterPlan plan;
private final StorageManager sm;
- private QueryMasterTask.QueryContext context;
+ QueryMasterTask.QueryContext context;
private ExecutionBlockCursor cursor;
// Query Status
@@ -305,19 +309,29 @@ public class Query implements EventHandler<QueryEvent> {
} else { // Finish a query
if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
- TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
- subQuery.getTableMeta(), query.context.getOutputPath());
- query.setResultDesc(desc);
- try {
- query.writeStat(query.context.getOutputPath(), subQuery);
- } catch (IOException e) {
- e.printStackTrace();
+ TableDesc outputTableDesc = new TableDescImpl(query.context.getQueryMeta().getOutputTable(),
+ subQuery.getTableMeta(), query.context.getQueryMeta().getOutputPath());
+ query.setResultDesc(outputTableDesc);
+
+ if (!query.context.getQueryMeta().isFileOutput()) {
+ try {
+ query.writeStat(query.context.getQueryMeta().getOutputPath(), subQuery);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
query.eventHandler.handle(new QueryFinishEvent(query.getId()));
- if (query.context.isCreateTableQuery()) {
- // TOOD move to QueryJobManager
- //query.context.getCatalog().addTable(desc);
+ StoreTableNode storeTableNode = (StoreTableNode) PlannerUtil.findTopNode(subQuery.getBlock().getPlan(),
+ NodeType.STORE);
+ if (storeTableNode.isCreatedTable()) {
+ query.context.getQueryMasterContext().getWorkerContext().getCatalog().addTable(outputTableDesc);
+ } else if (storeTableNode.isOverwrite() && !query.context.getQueryMeta().isFileOutput()) {
+ CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+ TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
+ updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
+ catalog.deleteTable(outputTableDesc.getName());
+ catalog.addTable(updatingTable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 10be0a3..f6cf02e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,6 +29,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.QueryMeta;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
@@ -45,6 +46,8 @@ public class QueryInProgress extends CompositeService {
private QueryId queryId;
+ private QueryMeta queryMeta;
+
private TajoAsyncDispatcher dispatcher;
private LogicalRootNode plan;
@@ -63,9 +66,11 @@ public class QueryInProgress extends CompositeService {
public QueryInProgress(
TajoMaster.MasterContext masterContext,
+ QueryMeta queryMeta,
QueryId queryId, String sql, LogicalRootNode plan) {
super(QueryInProgress.class.getName());
this.masterContext = masterContext;
+ this.queryMeta = queryMeta;
this.queryId = queryId;
this.plan = plan;
@@ -195,12 +200,13 @@ public class QueryInProgress extends CompositeService {
//TODO wait
return;
}
- LOG.info("====>Call executeQuery to :" +
+ LOG.info("Call executeQuery to :" +
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
queryMasterRpcClient.executeQuery(
null,
TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
.setQueryId(queryId.getProto())
+ .setQueryMeta(queryMeta.getProto())
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
.build(), NullCallback.get());
querySubmitted.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 16008fc..fb92616 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.QueryMeta;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
@@ -84,9 +85,9 @@ public class QueryJobManager extends CompositeService {
return dispatcher.getEventHandler();
}
- public QueryInfo createNewQueryJob(String sql, LogicalRootNode plan) throws Exception {
+ public QueryInfo createNewQueryJob(QueryMeta queryMeta, String sql, LogicalRootNode plan) throws Exception {
QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryId, sql, plan);
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext,queryMeta, queryId, sql, plan);
synchronized(runningQueries) {
runningQueries.put(queryId, queryInProgress);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 50ec5be..53b9c05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -60,7 +60,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
private GlobalOptimizer globalOptimizer;
-// private boolean isCreateTableStmt;
private StorageManager storageManager;
private QueryConf queryConf;
@@ -88,7 +87,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
queryConf = new QueryConf(conf);
queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
- QUERY_SESSION_TIMEOUT = 60 * 1000;//queryConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+ QUERY_SESSION_TIMEOUT = 60 * 1000;
queryMasterContext = new QueryMasterContext(queryConf);
clock = new SystemClock();
@@ -112,7 +111,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
@Override
public void start() {
- LOG.info("====>QueryMaster start");
+ LOG.info("QueryMaster start");
queryHeartbeatThread = new QueryHeartbeatThread();
queryHeartbeatThread.start();
@@ -253,10 +252,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
@Override
public void handle(QueryStartEvent event) {
- LOG.info("====>Start QueryStartEventHandler:" + event.getQueryId());
+ LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
//To change body of implemented methods use File | Settings | File Templates.
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getQueryMeta(), event.getLogicalPlanJson());
queryMasterTask.init(queryConf);
queryMasterTask.start();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index cfea8a2..f348e44 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -39,6 +39,7 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.master.QueryMeta;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -64,6 +65,8 @@ public class QueryMasterTask extends CompositeService {
private QueryId queryId;
+ private QueryMeta queryMeta;
+
private QueryContext queryContext;
private QueryMaster.QueryMasterContext queryMasterContext;
@@ -78,8 +81,6 @@ public class QueryMasterTask extends CompositeService {
private final long querySubmitTime;
- private boolean isCreateTableStmt;
-
private Path outputPath;
private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
@@ -93,10 +94,11 @@ public class QueryMasterTask extends CompositeService {
private AtomicBoolean stopped = new AtomicBoolean(false);
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, String logicalPlanJson) {
+ QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
+ this.queryMeta = queryMeta;
this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@@ -269,13 +271,12 @@ public class QueryMasterTask extends CompositeService {
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- String givenOutputTableName = queryConf.getOutputTable();
+ String givenOutputTableName = queryMeta.getOutputTable();
Path stagingDir;
// If final output directory is not given by an user,
// we use the query id as a output directory.
if (givenOutputTableName == null || givenOutputTableName.isEmpty()) {
- this.isCreateTableStmt = false;
FileSystem defaultFS = FileSystem.get(queryConf);
Path homeDirectory = defaultFS.getHomeDirectory();
@@ -318,25 +319,31 @@ public class QueryMasterTask extends CompositeService {
}
// Set the query id to the output table name
- queryConf.setOutputTable(queryId.toString());
+ queryMeta.setOutputTable(queryId.toString());
+
+ } else { // if a output table is given
- } else {
- this.isCreateTableStmt = true;
Path warehouseDir = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR),
TajoConstants.WAREHOUSE_DIR);
- stagingDir = new Path(warehouseDir, queryConf.getOutputTable());
-
FileSystem fs = warehouseDir.getFileSystem(queryConf);
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + " already exists. The directory must be unique to each query");
+
+ if (queryMeta.isFileOutput()) {
+ stagingDir = queryMeta.getOutputPath();
} else {
- // TODO - should have appropriate permission
- fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ stagingDir = new Path(warehouseDir, queryMeta.getOutputTable());
}
+
+ if (!queryMeta.isOutputOverwrite()) {
+ if (fs.exists(stagingDir)) {
+ throw new IOException("The staging directory " + stagingDir
+ + " already exists. The directory must be unique to each query");
+ }
+ }
+
+ fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
}
- queryConf.setOutputPath(stagingDir);
+ queryMeta.setOutputPath(stagingDir);
outputPath = stagingDir;
LOG.info("Initialized Query Staging Dir: " + outputPath);
}
@@ -383,6 +390,10 @@ public class QueryMasterTask extends CompositeService {
return queryMasterContext;
}
+ public QueryMeta getQueryMeta() {
+ return queryMeta;
+ }
+
public QueryConf getConf() {
return queryConf;
}
@@ -407,10 +418,6 @@ public class QueryMasterTask extends CompositeService {
return outputPath;
}
- public boolean isCreateTableQuery() {
- return isCreateTableStmt;
- }
-
public synchronized EventHandler getEventHandler() {
if(eventHandler == null) {
eventHandler = dispatcher.getEventHandler();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 89d3fed..ecde59c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -46,11 +46,8 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.*;
import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.TaskScheduler;
-import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageManager;
@@ -72,6 +69,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static final Log LOG = LogFactory.getLog(SubQuery.class);
+ private QueryMeta queryMeta;
private ExecutionBlock block;
private int priority;
private TableMeta meta;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index e6ec9c0..1e9d6e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -189,7 +189,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
if(!fs.exists(queryConfPath)){
- LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConf.getOutputPath());
+ LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConfPath);
writeConf(queryConf, queryConfPath);
} else {
LOG.warn("QueryConf already exist. path: " + queryConfPath.toString());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index bb27ee4..0fc896b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -21,13 +21,14 @@ package org.apache.tajo.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -36,11 +37,13 @@ import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.webapp.StaticHttpServer;
import java.io.BufferedReader;
import java.io.File;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -64,11 +67,15 @@ public class TajoWorker extends CompositeService {
private TajoWorkerManagerService tajoWorkerManagerService;
+ private InetSocketAddress tajoMasterAddress;
+
//to TajoMaster
private ProtoAsyncRpcClient tajoMasterRpc;
private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
+ private CatalogClient catalogClient;
+
private WorkerContext workerContext;
private TaskRunnerManager taskRunnerManager;
@@ -171,17 +178,14 @@ public class TajoWorker extends CompositeService {
workerHeartbeatThread.interrupt();
}
-// try {
-// FileSystem.closeAll();
-// } catch (IOException e) {
-// LOG.error(e.getMessage(), e);
-// }
+ if (catalogClient != null) {
+ catalogClient.close();
+ }
+
if(tajoMasterRpc != null) {
tajoMasterRpc.close();
}
-// for(Service eachService: getServices()) {
-// System.out.println("Service:" + eachService);
-// }
+
super.stop();
LOG.info("TajoWorker main thread exiting");
}
@@ -207,6 +211,10 @@ public class TajoWorker extends CompositeService {
return taskRunnerManager;
}
+ public CatalogService getCatalog() {
+ return catalogClient;
+ }
+
public TajoPullServerService getPullService() {
return pullService;
}
@@ -246,9 +254,10 @@ public class TajoWorker extends CompositeService {
private void setWorkerMode(String[] params) {
if("qm".equals(daemonMode)) {
//QueryMaster mode
- String tajoMasterAddress = params[2];
+ String tajoMasterAddress = params[2];
connectToTajoMaster(tajoMasterAddress);
+ connectToCatalog();
QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
@@ -259,21 +268,24 @@ public class TajoWorker extends CompositeService {
} else {
//Standby mode
connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
+ connectToCatalog();
workerHeartbeatThread = new WorkerHeartbeatThread();
workerHeartbeatThread.start();
}
}
- private void connectToTajoMaster(String tajoMasterAddress) {
- LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
- InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+ private void connectToTajoMaster(String tajoMasterAddrString) {
+ LOG.info("Connecting to TajoMaster (" + tajoMasterAddrString +")");
+ this.tajoMasterAddress = NetUtils.createSocketAddr(tajoMasterAddrString);
+
while(true) {
try {
- tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
+ tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
tajoMasterRpcClient = tajoMasterRpc.getStub();
break;
} catch (Exception e) {
- LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
+ LOG.error("Can't connect to TajoMaster[" + NetUtils.normalizeInetSocketAddress(tajoMasterAddress) + "], "
+ + e.getMessage(), e);
}
try {
@@ -283,6 +295,17 @@ public class TajoWorker extends CompositeService {
}
}
+ private void connectToCatalog() {
+ // TODO: To be improved. it's a hack. It assumes that CatalogServer is embedded in TajoMaster.
+ String hostName = this.tajoMasterAddress.getHostName();
+ int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
+ try {
+ catalogClient = new CatalogClient(hostName, port);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
class WorkerHeartbeatThread extends Thread {
TajoMasterProtocol.ServerStatusProto.System systemInfo;
List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 3969b36..e1fd88a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -100,11 +100,11 @@ public class TajoWorkerClientService extends AbstractService {
@Override
public void stop() {
- LOG.info("====> TajoWorkerClientService stopping");
+ LOG.info("TajoWorkerClientService stopping");
if(rpcServer != null) {
rpcServer.shutdown();
}
- LOG.info("====> TajoWorkerClientService stopped");
+ LOG.info("TajoWorkerClientService stopped");
super.stop();
}
@@ -174,7 +174,10 @@ public class TajoWorkerClientService extends AbstractService {
builder.setProgress(query.getProgress());
builder.setSubmitTime(query.getAppSubmitTime());
builder.setInitTime(query.getInitializationTime());
- builder.setHasResult(!queryMasterTask.getQueryContext().isCreateTableQuery());
+ builder.setHasResult(
+ !(queryMasterTask.getQueryContext().getQueryMeta().isCreateTable() ||
+ queryMasterTask.getQueryContext().getQueryMeta().isInsert())
+ );
if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
builder.setFinishTime(query.getFinishTime());
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index a48339a..6fde6e4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,6 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.QueryMeta;
import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -199,8 +200,9 @@ public class TajoWorkerManagerService extends CompositeService
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
QueryId queryId = new QueryId(request.getQueryId());
- LOG.info("====>Receive executeQuery request:" + queryId);
- queryMaster.handle(new QueryStartEvent(queryId, request.getLogicalPlanJson().getValue()));
+ LOG.info("Receive executeQuery request:" + queryId);
+ queryMaster.handle(new QueryStartEvent(queryId,
+ new QueryMeta(request.getQueryMeta()), request.getLogicalPlanJson().getValue()));
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index f9cc82c..aef5ead 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -48,6 +48,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.master.QueryMeta;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageUtil;
@@ -69,6 +70,7 @@ public class Task {
private static final Log LOG = LogFactory.getLog(Task.class);
private final QueryConf conf;
+ private final QueryMeta queryMeta;
private final FileSystem localFS;
private final TaskRunner.TaskRunnerContext taskRunnerContext;
private final Interface masterProxy;
@@ -136,6 +138,7 @@ public class Task {
this.taskId = request.getId();
this.conf = worker.getQueryConf();
+ this.queryMeta = request.getQueryMeta();
this.taskRunnerContext = worker;
this.masterProxy = masterProxy;
this.localFS = worker.getLocalFS();
@@ -160,7 +163,7 @@ public class Task {
} else {
// The final result of a task will be written in a file named part-ss-nnnnnnn,
// where ss is the subquery id associated with this task, and nnnnnn is the task id.
- Path outFilePath = new Path(conf.getOutputPath(),
+ Path outFilePath = new Path(queryMeta.getOutputPath(),
OUTPUT_FILE_PREFIX +
OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 8493be2..5802ade 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -39,7 +39,6 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -106,20 +105,6 @@ public class TaskRunner extends AbstractService {
private TaskRunnerManager taskRunnerManager;
- public TaskRunner(
- final ExecutionBlockId executionBlockId,
- final NodeId nodeId,
- UserGroupInformation taskOwner,
- Interface master, ContainerId containerId) {
- super(TaskRunner.class.getName());
- this.executionBlockId = executionBlockId;
- this.queryId = executionBlockId.getQueryId();
- this.nodeId = nodeId;
- this.taskOwner = taskOwner;
- this.master = master;
- this.containerId = containerId;
- }
-
public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
super(TaskRunner.class.getName());
@@ -127,10 +112,7 @@ public class TaskRunner extends AbstractService {
try {
final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
- conf.setOutputPath(new Path(args[6]));
-
LOG.info("NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
- LOG.info("OUTPUT DIR: " + conf.getOutputPath());
LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
UserGroupInformation.setConfiguration(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 88a2029..0694b4e 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -59,6 +59,7 @@ message QueryUnitRequestProto {
optional bool interQuery = 6 [default = false];
repeated Fetch fetches = 7;
optional bool shouldDie = 8;
+ optional KeyValueSetProto queryMeta = 9;
}
message Fetch {
@@ -104,7 +105,8 @@ message Partition {
message QueryExecutionRequestProto {
required QueryIdProto queryId = 1;
- required StringProto logicalPlanJson = 2;
+ required KeyValueSetProto queryMeta = 2;
+ required StringProto logicalPlanJson = 3;
}
message GetTaskRequestProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index f031938..ab8d4b4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -133,7 +133,7 @@ public class TajoTestingCluster {
String dirStr = getTestDir(randomStr).toString();
File dir = new File(dirStr).getAbsoluteFile();
// Have it cleaned up on exit
- //dir.deleteOnExit();
+ dir.deleteOnExit();
return dir;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 2efaadc..e61a415 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.Tuple;
@@ -124,7 +125,7 @@ public class TestEvalTree {
"select name from people where NOT (20 > 30)", // 5
};
- public static Target[] getRawTargets(String query) {
+ public static Target[] getRawTargets(String query) throws PlanningException {
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(expr);
Target [] targets = plan.getRootBlock().getTargetListManager().getUnEvaluatedTargets();
@@ -136,7 +137,12 @@ public class TestEvalTree {
public static EvalNode getRootSelection(String query) {
Expr block = analyzer.parse(query);
- LogicalPlan plan = planner.createPlan(block);
+ LogicalPlan plan = null;
+ try {
+ plan = planner.createPlan(block);
+ } catch (PlanningException e) {
+ e.printStackTrace();
+ }
EvalNode qual = plan.getRootBlock().getSelectionNode().getQual();
assertJsonSerDer(qual);
return qual;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 1a24ec8..9b974cc 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -31,6 +31,7 @@ import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.EvalExprNode;
import org.apache.tajo.engine.planner.logical.NodeType;
@@ -96,7 +97,12 @@ public class TestEvalTreeUtil {
public static Target [] getRawTargets(String query) {
Expr expr = analyzer.parse(query);
- LogicalPlan plan = planner.createPlan(expr);
+ LogicalPlan plan = null;
+ try {
+ plan = planner.createPlan(expr);
+ } catch (PlanningException e) {
+ e.printStackTrace();
+ }
if (plan.getRootBlock().getRoot().getType() == NodeType.EXPRS) {
return ((EvalExprNode)plan.getRootBlock().getRoot()).getExprs();
} else {
@@ -106,7 +112,12 @@ public class TestEvalTreeUtil {
public static EvalNode getRootSelection(String query) {
Expr block = analyzer.parse(query);
- LogicalPlan plan = planner.createPlan(block);
+ LogicalPlan plan = null;
+ try {
+ plan = planner.createPlan(block);
+ } catch (PlanningException e) {
+ e.printStackTrace();
+ }
return plan.getRootBlock().getSelectionNode().getQual();
}
@@ -173,7 +184,7 @@ public class TestEvalTreeUtil {
}
@Test
- public final void testGetContainExprs() throws CloneNotSupportedException {
+ public final void testGetContainExprs() throws CloneNotSupportedException, PlanningException {
Expr expr = analyzer.parse(QUERIES[1]);
LogicalPlan plan = planner.createPlan(expr);
Target [] targets = plan.getRootBlock().getTargetListManager().getUnEvaluatedTargets();
@@ -236,7 +247,7 @@ public class TestEvalTreeUtil {
}
@Test
- public final void testSimplify() {
+ public final void testSimplify() throws PlanningException {
Target [] targets = getRawTargets(QUERIES[0]);
EvalNode node = AlgebraicUtil.simplify(targets[0].getEvalTree());
EvalContext nodeCtx = node.newContext();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 5ba26f8..75a531f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -78,18 +78,6 @@ public class TestSQLAnalyzer {
}
@Test
- public void testInsert1() throws IOException {
- String sql = FileUtil.readTextFile(new File("src/test/queries/insert_1.sql"));
- parseQuery(sql);
- }
-
- @Test
- public void testInsert2() throws IOException {
- String sql = FileUtil.readTextFile(new File("src/test/queries/insert_2.sql"));
- parseQuery(sql);
- }
-
- @Test
public void testGroupby1() throws IOException {
String sql = FileUtil.readTextFile(new File("src/test/queries/groupby_1.sql"));
parseQuery(sql);
@@ -251,6 +239,42 @@ public class TestSQLAnalyzer {
parseQuery(sql);
}
+ @Test
+ public void testInsertIntoTable() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_1.sql"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testInsertIntoLocation() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_2.sql"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testInsertIntoTable2() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_3.sql"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testInsertOverwriteIntoTable() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_1.sql"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testInsertOverwriteIntoLocation() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_2.sql"));
+ parseQuery(sql);
+ }
+
+ @Test
+ public void testInsertOverwriteIntoTable2() throws IOException {
+ String sql = FileUtil.readTextFile(new File("src/test/queries/insert_overwrite_into_select_3.sql"));
+ parseQuery(sql);
+ }
+
static String[] exprs = {
"1 + 2", // 0
"3 - 4", // 1
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 7c0b9d0..72cbfc2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -242,4 +242,11 @@ public class TestLogicalOptimizer {
// Test for Join Node
assertTrue(PlannerUtil.canBeEvaluated(selNode.getQual(), scanNode));
}
+
+ @Test
+ public final void testInsertInto() throws CloneNotSupportedException, PlanningException {
+ Expr expr = sqlAnalyzer.parse(TestLogicalPlanner.insertStatements[0]);
+ LogicalPlan newPlan = planner.createPlan(expr);
+ optimizer.optimize(newPlan);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 980f435..7f95a85 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -44,8 +44,7 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestLogicalPlanner {
private static TajoTestingCluster util;
@@ -62,7 +61,7 @@ public class TestLogicalPlanner {
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
catalog.registerFunction(funcDesc);
}
-
+
Schema schema = new Schema();
schema.addColumn("name", Type.TEXT);
schema.addColumn("empid", Type.INT4);
@@ -98,7 +97,7 @@ public class TestLogicalPlanner {
// TPC-H Schema for Complex Queries
String [] tpchTables = {
- "part", "supplier", "partsupp", "nation", "region"
+ "part", "supplier", "partsupp", "nation", "region", "lineitem"
};
tpch = new TPCH();
tpch.loadSchemas();
@@ -136,7 +135,7 @@ public class TestLogicalPlanner {
};
@Test
- public final void testSingleRelation() throws CloneNotSupportedException {
+ public final void testSingleRelation() throws CloneNotSupportedException, PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[0]);
LogicalPlan planNode = planner.createPlan(expr);
LogicalNode plan = planNode.getRootBlock().getRoot();
@@ -168,7 +167,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testImplicityJoinPlan() throws CloneNotSupportedException {
+ public final void testImplicityJoinPlan() throws CloneNotSupportedException, PlanningException {
// two relations
Expr expr = sqlAnalyzer.parse(QUERIES[1]);
LogicalPlan planNode = planner.createPlan(expr);
@@ -264,7 +263,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testNaturalJoinPlan() {
+ public final void testNaturalJoinPlan() throws PlanningException {
// two relations
Expr context = sqlAnalyzer.parse(JOINS[0]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
@@ -300,7 +299,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testInnerJoinPlan() {
+ public final void testInnerJoinPlan() throws PlanningException {
// two relations
Expr expr = sqlAnalyzer.parse(JOINS[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -336,7 +335,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testOuterJoinPlan() {
+ public final void testOuterJoinPlan() throws PlanningException {
// two relations
Expr expr = sqlAnalyzer.parse(JOINS[2]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -374,7 +373,7 @@ public class TestLogicalPlanner {
@Test
- public final void testGroupby() throws CloneNotSupportedException {
+ public final void testGroupby() throws CloneNotSupportedException, PlanningException {
// without 'having clause'
Expr context = sqlAnalyzer.parse(QUERIES[7]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
@@ -412,7 +411,7 @@ public class TestLogicalPlanner {
@Test
- public final void testMultipleJoin() throws IOException {
+ public final void testMultipleJoin() throws IOException, PlanningException {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File("src/test/queries/tpch_q2_simplified.tql")));
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -444,7 +443,7 @@ public class TestLogicalPlanner {
@Test
- public final void testStoreTable() throws CloneNotSupportedException {
+ public final void testStoreTable() throws CloneNotSupportedException, PlanningException {
Expr context = sqlAnalyzer.parse(QUERIES[8]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
TestLogicalNode.testCloneLogicalNode(plan);
@@ -460,7 +459,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testOrderBy() throws CloneNotSupportedException {
+ public final void testOrderBy() throws CloneNotSupportedException, PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[4]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -487,7 +486,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testLimit() throws CloneNotSupportedException {
+ public final void testLimit() throws CloneNotSupportedException, PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[12]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -506,7 +505,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testSPJPush() throws CloneNotSupportedException {
+ public final void testSPJPush() throws CloneNotSupportedException, PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[5]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -535,7 +534,7 @@ public class TestLogicalPlanner {
@Test
- public final void testSPJ() throws CloneNotSupportedException {
+ public final void testSPJ() throws CloneNotSupportedException, PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[6]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -543,7 +542,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testJson() {
+ public final void testJson() throws PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[9]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -560,7 +559,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testVisitor() {
+ public final void testVisitor() throws PlanningException {
// two relations
Expr expr = sqlAnalyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -585,7 +584,7 @@ public class TestLogicalPlanner {
@Test
- public final void testExprNode() {
+ public final void testExprNode() throws PlanningException {
Expr expr = sqlAnalyzer.parse(QUERIES[10]);
LogicalPlan rootNode = planner.createPlan(expr);
LogicalNode plan = rootNode.getRootBlock().getRoot();
@@ -611,7 +610,7 @@ public class TestLogicalPlanner {
@Test
- public final void testAlias1() {
+ public final void testAlias1() throws PlanningException {
Expr expr = sqlAnalyzer.parse(ALIAS[0]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
LogicalRootNode root = (LogicalRootNode) plan;
@@ -638,7 +637,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testAlias2() {
+ public final void testAlias2() throws PlanningException {
Expr expr = sqlAnalyzer.parse(ALIAS[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
LogicalRootNode root = (LogicalRootNode) plan;
@@ -657,7 +656,7 @@ public class TestLogicalPlanner {
};
@Test
- public final void testCreateTableDef() {
+ public final void testCreateTableDef() throws PlanningException {
Expr expr = sqlAnalyzer.parse(CREATE_TABLE[0]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
LogicalRootNode root = (LogicalRootNode) plan;
@@ -741,7 +740,7 @@ public class TestLogicalPlanner {
};
@Test
- public final void testCubeBy() {
+ public final void testCubeBy() throws PlanningException {
Expr expr = sqlAnalyzer.parse(CUBE_ROLLUP[0]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -778,7 +777,6 @@ public class TestLogicalPlanner {
}
}
-
static final String setStatements [] = {
"select deptName from employee where deptName like 'data%' union select deptName from score where deptName like 'data%'",
"select deptName from employee union select deptName from score as s1 intersect select deptName from score as s2",
@@ -786,7 +784,7 @@ public class TestLogicalPlanner {
};
@Test
- public final void testSetPlan() {
+ public final void testSetPlan() throws PlanningException {
Expr expr = sqlAnalyzer.parse(setStatements[0]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -803,7 +801,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testSetPlan2() {
+ public final void testSetPlan2() throws PlanningException {
// for testing multiple set statements
Expr expr = sqlAnalyzer.parse(setStatements[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -820,7 +818,7 @@ public class TestLogicalPlanner {
}
@Test
- public final void testSetPlan3() {
+ public final void testSetPlan3() throws PlanningException {
// for testing multiple set statements
Expr expr = sqlAnalyzer.parse(setStatements[2]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -839,8 +837,6 @@ public class TestLogicalPlanner {
assertEquals(NodeType.PROJECTION, intersect.getRightChild().getType());
}
-
-
static final String [] setQualifiers = {
"select name, empid from employee",
"select distinct name, empid from employee",
@@ -848,7 +844,7 @@ public class TestLogicalPlanner {
};
@Test
- public void testSetQualifier() {
+ public void testSetQualifier() throws PlanningException {
Expr context = sqlAnalyzer.parse(setQualifiers[0]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
testJsonSerDerObject(plan);
@@ -881,4 +877,93 @@ public class TestLogicalPlanner {
LogicalNode fromJson = CoreGsonHelper.fromJson(json, LogicalNode.class);
assertEquals("JSON (de) serialization equivalence check", rootNode, fromJson);
}
+
+ // Table descriptions
+ //
+ // employee (name text, empid int4, deptname text)
+ // dept (deptname text, nameger text)
+ // score (deptname text, score inet4)
+
+ static final String [] insertStatements = {
+ "insert into score select name from employee", // 0
+ "insert into score select name, empid from employee", // 1
+ "insert into employee (name, deptname) select * from dept", // 2
+ "insert into location '/tmp/data' select name, empid from employee", // 3
+ "insert overwrite into employee (name, deptname) select * from dept", // 4
+ "insert overwrite into LOCATION '/tmp/data' select * from dept" // 5
+ };
+
+ @Test
+ public final void testInsertInto0() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[0]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertFalse(insertNode.isOverwrite());
+ assertTrue(insertNode.hasTargetTable());
+ assertEquals("score", insertNode.getTargetTable().getName());
+ }
+
+ @Test
+ public final void testInsertInto1() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[1]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertFalse(insertNode.isOverwrite());
+ assertEquals("score", insertNode.getTargetTable().getName());
+ }
+
+ @Test
+ public final void testInsertInto2() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[2]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertFalse(insertNode.isOverwrite());
+ assertEquals("employee", insertNode.getTargetTable().getName());
+ assertTrue(insertNode.hasTargetSchema());
+ assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
+ assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
+ }
+
+ @Test
+ public final void testInsertInto3() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[3]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertFalse(insertNode.isOverwrite());
+ assertTrue(insertNode.hasPath());
+ }
+
+ @Test
+ public final void testInsertInto4() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[4]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertTrue(insertNode.isOverwrite());
+ assertTrue(insertNode.hasTargetTable());
+ assertEquals("employee", insertNode.getTargetTable().getName());
+ assertTrue(insertNode.hasTargetSchema());
+ assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
+ assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
+ }
+
+ @Test
+ public final void testInsertInto5() throws PlanningException {
+ Expr expr = sqlAnalyzer.parse(insertStatements[5]);
+ LogicalPlan plan = planner.createPlan(expr);
+ assertEquals(2, plan.getQueryBlocks().size());
+ InsertNode insertNode = getInsertNode(plan);
+ assertTrue(insertNode.isOverwrite());
+ assertTrue(insertNode.hasPath());
+ }
+
+ private static InsertNode getInsertNode(LogicalPlan plan) {
+ LogicalRootNode root = (LogicalRootNode) plan.getRootBlock().getRoot();
+ assertEquals(NodeType.INSERT, root.getChild().getType());
+ return (InsertNode) root.getChild();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 1e95355..4821751 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -96,7 +96,7 @@ public class TestPlannerUtil {
}
@Test
- public final void testTransformTwoPhase() {
+ public final void testTransformTwoPhase() throws PlanningException {
// without 'having clause'
Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[7]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -111,7 +111,7 @@ public class TestPlannerUtil {
}
@Test
- public final void testTrasformTwoPhaseWithStore() {
+ public final void testTrasformTwoPhaseWithStore() throws PlanningException {
Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[9]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
@@ -141,7 +141,7 @@ public class TestPlannerUtil {
}
@Test
- public final void testFindTopNode() throws CloneNotSupportedException {
+ public final void testFindTopNode() throws CloneNotSupportedException, PlanningException {
// two relations
Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index bea1eb6..97459f9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
@@ -129,7 +130,7 @@ public class TestBNLJoinExec {
"inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" };
@Test
- public final void testBNLCrossJoin() throws IOException {
+ public final void testBNLCrossJoin() throws IOException, PlanningException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -164,7 +165,7 @@ public class TestBNLJoinExec {
}
@Test
- public final void testBNLInnerJoin() throws IOException {
+ public final void testBNLInnerJoin() throws IOException, PlanningException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 621f5a2..0fc3773 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -29,10 +29,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -110,7 +107,7 @@ public class TestExternalSortExec {
};
@Test
- public final void testNext() throws IOException {
+ public final void testNext() throws IOException, PlanningException {
Fragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 9422358..e270df3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -32,6 +32,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
@@ -129,7 +130,7 @@ public class TestHashJoinExec {
};
@Test
- public final void testHashInnerJoin() throws IOException {
+ public final void testHashInnerJoin() throws IOException, PlanningException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index 9d85970..776882b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -33,6 +33,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.storage.*;
@@ -145,7 +146,7 @@ public class TestMergeJoinExec {
};
@Test
- public final void testMergeInnerJoin() throws IOException {
+ public final void testMergeInnerJoin() throws IOException, PlanningException {
Fragment[] empFrags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = sm.splitNG(conf, "people", people.getMeta(), people.getPath(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 7235924..9289dc9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -19,24 +19,27 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
@@ -129,7 +132,7 @@ public class TestNLJoinExec {
};
@Test
- public final void testNLCrossJoin() throws IOException {
+ public final void testNLCrossJoin() throws IOException, PlanningException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
@@ -156,7 +159,7 @@ public class TestNLJoinExec {
}
@Test
- public final void testNLInnerJoin() throws IOException {
+ public final void testNLInnerJoin() throws IOException, PlanningException {
Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
new file mode 100644
index 0000000..e9a0c65
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestInsertQuery {
+ private static TpchTestBase tpch;
+ public TestInsertQuery() throws IOException {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ tpch = TpchTestBase.getInstance();
+ }
+
+ @Test
+ public final void testInsertOverwrite() throws Exception {
+ String tableName ="InsertOverwrite";
+ tpch.execute("create table " + tableName +" (col1 int8, col2 int4, col3 float4)");
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+ }
+
+ @Test
+ public final void testInsertOverwriteSmallerColumns() throws Exception {
+ String tableName = "insertoverwritesmallercolumns";
+ tpch.execute("create table " + tableName + " (col1 int8, col2 int4, col3 float4)");
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+ TableDesc originalDesc = catalog.getTableDesc(tableName);
+
+ tpch.execute("insert overwrite into " + tableName + " select l_orderkey from lineitem");
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+ assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema());
+ }
+
+ @Test
+ public final void testInsertOverwriteWithTargetColumns() throws Exception {
+ String tableName = "InsertOverwriteWithTargetColumns";
+ tpch.execute("create table " + tableName + " (col1 int8, col2 int4, col3 float4)");
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+ TableDesc originalDesc = catalog.getTableDesc(tableName);
+
+ tpch.execute("insert overwrite into " + tableName + " (col1, col3) select l_orderkey, l_quantity from lineitem");
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getMeta().getStat().getNumRows().intValue());
+
+ ResultSet res = tpch.execute("select * from " + tableName);
+ assertTrue(res.next());
+ assertEquals(1, res.getLong(1));
+ assertEquals("null", res.getString(2));
+ assertTrue(17.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(1, res.getLong(1));
+ assertEquals("null", res.getString(2));
+ assertTrue(36.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(2, res.getLong(1));
+ assertEquals("null", res.getString(2));
+ assertTrue(38.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(3, res.getLong(1));
+ assertEquals("null", res.getString(2));
+ assertTrue(45.0 == res.getFloat(3));
+
+ assertTrue(res.next());
+ assertEquals(3, res.getLong(1));
+ assertEquals("null", res.getString(2));
+ assertTrue(49.0 == res.getFloat(3));
+
+ assertFalse(res.next());
+ res.close();
+
+ assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema());
+ }
+
+ @Test
+ public final void testInsertOverwriteWithAsterisk() throws Exception {
+ String tableName = "testinsertoverwritewithasterisk";
+ tpch.execute("create table " + tableName + " as select * from lineitem");
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3");
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(2, desc.getMeta().getStat().getNumRows().intValue());
+ }
+
+ @Test
+ public final void testInsertOverwriteIntoSelect() throws Exception {
+ String tableName = "insertoverwriteintoselect";
+ ResultSet res = tpch.execute(
+ "create table " + tableName + " as select l_orderkey from lineitem");
+ assertFalse(res.next());
+ res.close();
+
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+ TableDesc orderKeys = catalog.getTableDesc(tableName);
+ assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue());
+
+ // this query will result in the two rows.
+ res = tpch.execute(
+ "insert overwrite into " + tableName + " select l_orderkey from lineitem where l_orderkey = 3");
+ assertFalse(res.next());
+ res.close();
+
+ assertTrue(catalog.existsTable(tableName));
+ orderKeys = catalog.getTableDesc(tableName);
+ assertEquals(2, orderKeys.getMeta().getStat().getNumRows().intValue());
+ }
+
+ @Test
+ public final void testInsertOverwriteCapitalTableName() throws Exception {
+ String tableName = "testInsertOverwriteCapitalTableName";
+ tpch.execute("create table " + tableName + " as select * from lineitem");
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3");
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(2, desc.getMeta().getStat().getNumRows().intValue());
+ }
+
+ @Test
+ public final void testInsertOverwriteLocation() throws Exception {
+ tpch.execute("insert overwrite into location '/tajo-data/testInsertOverwriteCapitalTableName' select * from lineitem where l_orderkey = 3");
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.exists(new Path("/tajo-data/testInsertOverwriteCapitalTableName")));
+ assertEquals(1, fs.listStatus(new Path("/tajo-data/testInsertOverwriteCapitalTableName")).length);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ffdb1c71/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index e96f7e5..f2a2a71 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.query;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -44,7 +47,7 @@ public class TestSelectQuery {
public static void setUp() throws Exception {
tpch = TpchTestBase.getInstance();
}
-
+
@Test
public final void testSelect() throws Exception {
ResultSet res = tpch.execute("select l_orderkey, l_partkey from lineitem");
@@ -277,7 +280,7 @@ public class TestSelectQuery {
try {
Map<Integer, String> result = Maps.newHashMap();
- result.put(0, "NULL");
+ result.put(0, "null");
result.put(1, "one");
result.put(2, "two");
result.put(3, "three");
@@ -406,15 +409,12 @@ public class TestSelectQuery {
public final void testCreateAfterSelect() throws Exception {
ResultSet res = tpch.execute(
"create table orderkeys as select l_orderkey from lineitem");
- try {
- int count = 0;
- for (;res.next();) {
- count++;
- }
- assertEquals(count, 5);
- } finally {
- res.close();
- }
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable("orderkeys"));
+ TableDesc orderKeys = catalog.getTableDesc("orderkeys");
+ assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue());
}
//@Test