You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/22 09:49:48 UTC
git commit: TAJO-543: InsertNode and CreateTableNode should play
their roles.
Updated Branches:
refs/heads/master 358dbace5 -> d06dd852e
TAJO-543: InsertNode and CreateTableNode should play their roles.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d06dd852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d06dd852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d06dd852
Branch: refs/heads/master
Commit: d06dd852e5f299a9cb329f870e4795a7bfda1917
Parents: 358dbac
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jan 22 13:44:41 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jan 22 17:40:37 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/engine/planner/BaseAlgebraVisitor.java | 6 +-
.../engine/planner/BasicLogicalPlanVisitor.java | 12 +-
.../apache/tajo/engine/planner/InsertNode.java | 115 +++++-----
.../engine/planner/LogicalPlanPreprocessor.java | 3 +-
.../engine/planner/LogicalPlanVerifier.java | 12 +-
.../tajo/engine/planner/LogicalPlanner.java | 223 ++++++++++++++-----
.../engine/planner/PhysicalPlannerImpl.java | 18 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 3 +-
.../engine/planner/PreLogicalPlanVerifier.java | 49 ++++
.../engine/planner/global/GlobalPlanner.java | 36 ++-
.../engine/planner/logical/CreateTableNode.java | 79 +------
.../logical/PartitionedTableScanNode.java | 1 -
.../planner/logical/PersistentStoreNode.java | 24 +-
.../planner/logical/ShuffleFileWriteNode.java | 4 +-
.../engine/planner/logical/StoreIndexNode.java | 4 +-
.../engine/planner/logical/StoreTableNode.java | 46 +---
.../tajo/engine/planner/logical/UnaryNode.java | 3 +-
.../ColumnPartitionedTableStoreExec.java | 32 +--
.../engine/planner/physical/StoreTableExec.java | 11 +-
.../planner/rewrite/ProjectionPushDownRule.java | 34 ++-
.../org/apache/tajo/master/GlobalEngine.java | 146 +++---------
.../apache/tajo/master/querymaster/Query.java | 32 ++-
.../tajo/engine/planner/TestLogicalPlanner.java | 22 +-
.../tajo/engine/query/TestInsertQuery.java | 4 +-
25 files changed, 486 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e4feb6a..020787c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -123,6 +123,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-543: InsertNode and CreateTableNode should play their roles. (hyunsik)
+
TAJO-409: Add explored and explained annotations to Tajo function system.
(SeongHwa Ahn via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
index ddc75ff..6cc6fd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BaseAlgebraVisitor.java
@@ -269,8 +269,10 @@ public class BaseAlgebraVisitor<CONTEXT, RESULT> implements AlgebraVisitor<CONTE
@Override
public RESULT visitProjection(CONTEXT ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
stack.push(expr);
- for (NamedExpr target : expr.getNamedExprs()) {
- visit(ctx, stack, target);
+ if (!expr.isAllProjected()) {
+ for (NamedExpr target : expr.getNamedExprs()) {
+ visit(ctx, stack, target);
+ }
}
RESULT result = visit(ctx, stack, expr.getChild());
stack.pop();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 59c6872..b629013 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -256,15 +256,21 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
public RESULT visitInsert(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
Stack<LogicalNode> stack) throws PlanningException {
stack.push(node);
- RESULT result = visit(context, plan, plan.getBlock(node.getSubQuery()), node.getSubQuery(), stack);
+ RESULT result = visit(context, plan, block, node.getChild(), stack);
stack.pop();
return result;
}
@Override
public RESULT visitCreateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node,
- Stack<LogicalNode> stack) {
- return null;
+ Stack<LogicalNode> stack) throws PlanningException {
+ RESULT result = null;
+ stack.push(node);
+ if (node.hasSubQuery()) {
+ result = visit(context, plan, block, node.getChild(), stack);
+ }
+ stack.pop();
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ecba05a..3dd371b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -20,30 +20,38 @@ package org.apache.tajo.engine.planner;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.LogicalNodeVisitor;
import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.util.TUtil;
-public class InsertNode extends LogicalNode implements Cloneable {
+public class InsertNode extends StoreTableNode implements Cloneable {
@Expose private boolean overwrite;
- @Expose private TableDesc targetTableDesc;
+ @Expose private Schema tableSchema;
+
+ /** a target schema of a target table */
@Expose private Schema targetSchema;
+ /** a output schema of select clause */
+ @Expose private Schema projectedSchema;
@Expose private Path path;
- @Expose private StoreType storageType;
- @Expose private Options options;
- @Expose private LogicalNode subQuery;
public InsertNode(int pid) {
super(pid, NodeType.INSERT);
}
- public void setTargetTableDesc(TableDesc desc) {
- this.targetTableDesc = desc;
+ public void setTargetTable(TableDesc desc) {
+ setTableName(desc.getName());
+ tableSchema = desc.getSchema();
+ setPath(desc.getPath());
+ setOptions(desc.getMeta().getOptions());
+ setStorageType(desc.getMeta().getStoreType());
+
+ if (desc.hasPartitions()) {
+ this.setPartitions(desc.getPartitions());
+ }
}
public void setTargetLocation(Path path) {
@@ -51,7 +59,7 @@ public class InsertNode extends LogicalNode implements Cloneable {
}
public void setSubQuery(LogicalNode subQuery) {
- this.subQuery = subQuery;
+ this.setChild(subQuery);
this.setInSchema(subQuery.getOutSchema());
this.setOutSchema(subQuery.getOutSchema());
}
@@ -64,12 +72,12 @@ public class InsertNode extends LogicalNode implements Cloneable {
this.overwrite = overwrite;
}
- public boolean hasTargetTable() {
- return this.targetTableDesc != null;
+ public Schema getTableSchema() {
+ return tableSchema;
}
- public TableDesc getTargetTable() {
- return this.targetTableDesc;
+ public void setTableSchema(Schema tableSchema) {
+ this.tableSchema = tableSchema;
}
public boolean hasTargetSchema() {
@@ -84,6 +92,14 @@ public class InsertNode extends LogicalNode implements Cloneable {
this.targetSchema = schema;
}
+ public Schema getProjectedSchema() {
+ return this.projectedSchema;
+ }
+
+ public void setProjectedSchema(Schema projected) {
+ this.projectedSchema = projected;
+ }
+
public boolean hasPath() {
return this.path != null;
}
@@ -99,42 +115,16 @@ public class InsertNode extends LogicalNode implements Cloneable {
public boolean hasStorageType() {
return this.storageType != null;
}
-
- public void setStorageType(StoreType storageType) {
- this.storageType = storageType;
- }
-
- public StoreType getStorageType() {
- return this.storageType;
- }
-
- public boolean hasOptions() {
- return this.options != null;
- }
-
- public void setOptions(Options opt) {
- this.options = opt;
- }
-
- public Options getOptions() {
- return this.options;
- }
-
- public LogicalNode getSubQuery() {
- return this.subQuery;
- }
@Override
public boolean equals(Object obj) {
if (obj instanceof InsertNode) {
InsertNode other = (InsertNode) obj;
return super.equals(other)
- && TUtil.checkEquals(this.targetTableDesc, other.targetTableDesc)
- && TUtil.checkEquals(path, other.path)
&& this.overwrite == other.overwrite
- && TUtil.checkEquals(this.storageType, other.storageType)
- && TUtil.checkEquals(options, other.options)
- && subQuery.equals(other.subQuery);
+ && TUtil.checkEquals(this.tableSchema, other.tableSchema)
+ && TUtil.checkEquals(this.targetSchema, other.targetSchema)
+ && TUtil.checkEquals(path, other.path);
} else {
return false;
}
@@ -142,14 +132,12 @@ public class InsertNode extends LogicalNode implements Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
- InsertNode store = (InsertNode) super.clone();
- store.targetTableDesc = targetTableDesc != null ? targetTableDesc : null;
- store.path = path != null ? new Path(path.toString()) : null;
- store.overwrite = overwrite;
- store.storageType = storageType != null ? storageType : null;
- store.options = (Options) (options != null ? options.clone() : null);
- store.subQuery = (LogicalNode) subQuery.clone();
- return store;
+ InsertNode insertNode = (InsertNode) super.clone();
+ insertNode.overwrite = overwrite;
+ insertNode.tableSchema = new Schema(tableSchema);
+ insertNode.targetSchema = targetSchema != null ? new Schema(targetSchema) : null;
+ insertNode.path = path != null ? new Path(path.toString()) : null;
+ return insertNode;
}
public String toString() {
@@ -158,38 +146,47 @@ public class InsertNode extends LogicalNode implements Cloneable {
if (overwrite) {
sb.append("OVERWRITE ");
}
- sb.append("INTO");
+ sb.append("INTO ");
if (hasTargetTable()) {
- sb.append(targetTableDesc.getName());
+ sb.append(" ").append(tableName);
+ }
+
+ if (hasTargetSchema()) {
+ sb.append("\n ").append(targetSchema);
}
if (hasPath()) {
- sb.append(" LOCATION ");
+ sb.append("\n LOCATION ");
sb.append(path);
}
- sb.append(" [SUBQUERY]");
+ sb.append("\n").append(getChild());
return sb.toString();
}
@Override
public void preOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
+ getChild().preOrder(visitor);
+ visitor.visit(this);
}
@Override
public void postOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
+ visitor.visit(this);
+ getChild().postOrder(visitor);
}
@Override
public PlanString getPlanString() {
PlanString planString = new PlanString("INSERT");
- planString.addExplan(" INTO ");
+ planString.appendTitle(" INTO ");
if (hasTargetTable()) {
- planString.addExplan(getTargetTable().getName());
+ planString.appendTitle(getTableName());
+ if (hasTargetSchema()) {
+ planString.addExplan(getTargetSchema().toString());
+ }
} else {
planString.addExplan("LOCATION " + path);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
index 3ab328c..586ef68 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -290,8 +290,7 @@ class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor
///////////////////////////////////////////////////////////////////////////////////////////////////////////
public LogicalNode visitInsert(PreprocessContext ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
- PreprocessContext newContext = new PreprocessContext(ctx, ctx.plan.newQueryBlock());
- LogicalNode child = super.visitInsert(newContext, stack, expr);
+ LogicalNode child = super.visitInsert(ctx, stack, expr);
InsertNode insertNode = new InsertNode(ctx.plan.newPID());
insertNode.setInSchema(child.getOutSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index 41e42d0..ac76f57 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -164,21 +164,17 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
StoreTableNode node, Stack<LogicalNode> stack) throws PlanningException {
visit(state, plan, block, node.getChild(), stack);
- if (node.isCreatedTable() && catalog.existsTable(node.getTableName())) {
- state.addVerification("relation \"" + node.getTableName() + "\" already exists");
- }
-
return node;
}
@Override
public LogicalNode visitInsert(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
- LogicalNode child = visit(state, plan, plan.getBlock(node.getSubQuery()), node.getSubQuery(), stack);
+ LogicalNode child = visit(state, plan, block, node.getChild(), stack);
- if (node.hasTargetSchema()) {
- ensureDomains(state, node.getTargetSchema(), node.getSubQuery().getOutSchema());
- }
+// if (node.hasTargetSchema()) {
+// ensureDomains(state, node.getTargetSchema(), node.getChild().getOutSchema());
+// }
return child;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 49d5ffb..8995226 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.exception.VerifyException;
@@ -1041,60 +1042,153 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws PlanningException {
stack.push(expr);
- QueryBlock newQueryBlock = context.plan.getBlockByExpr(expr.getSubQuery());
- PlanContext newContext = new PlanContext(context, newQueryBlock);
- Stack<Expr> subStack = new Stack<Expr>();
- LogicalNode subQuery = visit(newContext, subStack, expr.getSubQuery());
- context.plan.connectBlocks(newQueryBlock, context.queryBlock, BlockType.TableSubQuery);
+ LogicalNode subQuery = super.visitInsert(context, stack, expr);
stack.pop();
- InsertNode insertNode = null;
- if (expr.hasTableName()) {
- TableDesc desc = catalog.getTableDesc(expr.getTableName());
+ InsertNode insertNode = context.queryBlock.getNodeFromExpr(expr);
+ insertNode.setOverwrite(expr.isOverwrite());
+ insertNode.setSubQuery(subQuery);
+
+ if (expr.hasTableName()) { // INSERT (OVERWRITE) INTO TABLE ...
+ return buildInsertIntoTablePlan(context, insertNode, expr);
+ } else if (expr.hasLocation()) { // INSERT (OVERWRITE) INTO LOCATION ...
+ return buildInsertIntoLocationPlan(context, insertNode, expr);
+ } else {
+ throw new IllegalStateException("Invalid Query");
+ }
+ }
+
+ /**
+ * Builds a InsertNode with a target table.
+ *
+ * ex) INSERT OVERWRITE INTO TABLE ...
+ * <br />
+ *
+ * We use the following terms, such target table, target column
+ * <pre>
+ * INSERT INTO TB_NAME (col1, col2) SELECT c1, c2 FROM ...
+ * ^^^^^^^ ^^^^^^^^^^^^ ^^^^^^^^^^^^
+ * target table target columns (or schema) projected columns (or schema)
+ * </pre>
+ */
+ private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
+ throws PlanningException {
+ // Get and set a target table
+ TableDesc desc = catalog.getTableDesc(expr.getTableName());
+ insertNode.setTargetTable(desc);
+
+ //
+ // When we use 'INSERT (OVERWIRTE) INTO TABLE statements, there are two cases.
+ //
+ // First, when a user specified target columns
+ // INSERT (OVERWRITE)? INTO table_name (col1 type, col2 type) SELECT ...
+ //
+ // Second, when a user do not specified target columns
+ // INSERT (OVERWRITE)? INTO table_name SELECT ...
+ //
+ // In the former case is, target columns' schema and corresponding projected columns' schema
+ // must be equivalent or be available to cast implicitly.
+ //
+ // In the later case, the target table's schema and projected column's
+ // schema of select clause can be different to each other. In this case,
+ // we use only a sequence of preceding columns of target table's schema
+ // as target columns.
+ //
+ // For example, consider a target table and an 'insert into' query are give as follows:
+ //
+ // CREATE TABLE TB1 (col1 int, col2 int, col3 long);
+ // || ||
+ // INSERT OVERWRITE INTO TB1 SELECT order_key, part_num FROM ...
+ //
+ // In this example, only col1 and col2 are used as target columns.
+
+ if (expr.hasTargetColumns()) { // when a user specified target columns
+
+ if (expr.getTargetColumns().length > insertNode.getChild().getOutSchema().getColumnNum()) {
+ throw new PlanningException("Target columns and projected columns are mismatched to each other");
+ }
+
+ // See PreLogicalPlanVerifier.visitInsert.
+ // It guarantees that the equivalence between the numbers of target and projected columns.
+
context.queryBlock.addRelation(new ScanNode(context.plan.newPID(), desc));
+ String [] targets = expr.getTargetColumns();
+ Schema targetColumns = new Schema();
+ for (int i = 0; i < targets.length; i++) {
+ Column targetColumn = context.plan.resolveColumn(context.queryBlock, new ColumnReferenceExpr(targets[i]));
+ targetColumns.addColumn(targetColumn);
+ }
+ insertNode.setTargetSchema(targetColumns);
+ insertNode.setOutSchema(targetColumns);
+ buildProjectedInsert(insertNode);
- Schema targetSchema = new Schema();
- if (expr.hasTargetColumns()) {
- // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
- String[] targetColumnNames = expr.getTargetColumns();
- for (int i = 0; i < targetColumnNames.length; i++) {
- Column targetColumn = context.plan.resolveColumn(context.queryBlock,
- new ColumnReferenceExpr(targetColumnNames[i]));
- targetSchema.addColumn(targetColumn);
- }
- } else {
- // use the output schema of select clause as target schema
- // if didn't specific target columns like the way below,
- // INSERT OVERWRITE INTO TABLE tbl SELECT ...
- Schema targetTableSchema = desc.getSchema();
- for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
- targetSchema.addColumn(targetTableSchema.getColumn(i));
- }
+ } else { // when a user do not specified target columns
+
+ // The output schema of select clause determines the target columns.
+ Schema tableSchema = desc.getSchema();
+ Schema projectedSchema = insertNode.getChild().getOutSchema();
+
+ Schema targetColumns = new Schema();
+ for (int i = 0; i < projectedSchema.getColumnNum(); i++) {
+ targetColumns.addColumn(tableSchema.getColumn(i));
}
+ insertNode.setTargetSchema(targetColumns);
+ buildProjectedInsert(insertNode);
+ }
- insertNode = context.queryBlock.getNodeFromExpr(expr);
- insertNode.setTargetTableDesc(desc);
- insertNode.setSubQuery(subQuery);
- insertNode.setTargetSchema(targetSchema);
- insertNode.setOutSchema(targetSchema);
+ if (desc.hasPartitions()) {
+ insertNode.setPartitions(desc.getPartitions());
}
+ return insertNode;
+ }
- if (expr.hasLocation()) {
- insertNode = context.queryBlock.getNodeFromExpr(expr);
- insertNode.setTargetLocation(new Path(expr.getLocation()));
- insertNode.setSubQuery(subQuery);
- if (expr.hasStorageType()) {
- insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- }
- if (expr.hasParams()) {
- Options options = new Options();
- options.putAll(expr.getParams());
- insertNode.setOptions(options);
+ private void buildProjectedInsert(InsertNode insertNode) {
+ Schema tableSchema = insertNode.getTableSchema();
+ Schema targetColumns = insertNode.getTargetSchema();
+
+ ProjectionNode projectionNode = insertNode.getChild();
+
+ // Modifying projected columns by adding NULL constants
+ // It is because that table appender does not support target columns to be written.
+ List<Target> targets = TUtil.newList();
+ for (int i = 0, j = 0; i < tableSchema.getColumnNum(); i++) {
+ Column column = tableSchema.getColumn(i);
+
+ if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
+ targets.add(projectionNode.getTargets()[j++]);
+ } else {
+ targets.add(new Target(new ConstEval(NullDatum.get()), column.getColumnName()));
}
}
+ projectionNode.setTargets(targets.toArray(new Target[targets.size()]));
- insertNode.setOverwrite(expr.isOverwrite());
+ insertNode.setInSchema(projectionNode.getOutSchema());
+ insertNode.setOutSchema(projectionNode.getOutSchema());
+ insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
+ }
+
+ /**
+ * Build a InsertNode with a location.
+ *
+ * ex) INSERT OVERWRITE INTO LOCATION 'hdfs://....' ..
+ */
+ private InsertNode buildInsertIntoLocationPlan(PlanContext context, InsertNode insertNode, Insert expr) {
+ // INSERT (OVERWRITE)? INTO LOCATION path (USING file_type (param_clause)?)? query_expression
+ Schema childSchema = insertNode.getChild().getOutSchema();
+ insertNode.setInSchema(childSchema);
+ insertNode.setOutSchema(childSchema);
+ insertNode.setTableSchema(childSchema);
+ insertNode.setTargetLocation(new Path(expr.getLocation()));
+
+ if (expr.hasStorageType()) {
+ insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+ }
+ if (expr.hasParams()) {
+ Options options = new Options();
+ options.putAll(expr.getParams());
+ insertNode.setOptions(options);
+ }
return insertNode;
}
@@ -1106,43 +1200,50 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
throws PlanningException {
- String tableName = expr.getTableName();
+ // Get a table name to be created.
+ String tableNameTobeCreated = expr.getTableName();
- if (expr.hasSubQuery()) {
+ if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
stack.add(expr);
LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
stack.pop();
- StoreTableNode storeNode = new StoreTableNode(context.plan.newPID(), tableName);
- storeNode.setCreateTable();
- storeNode.setChild(subQuery);
+ CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+ createTableNode.setTableName(tableNameTobeCreated);
+ createTableNode.setChild(subQuery);
+ createTableNode.setInSchema(subQuery.getOutSchema());
- storeNode.setInSchema(subQuery.getOutSchema());
+ // if no table definition, the select clause's output schema will be used.
+ // ex) CREATE TABLE tbl AS SELECT ...
if(!expr.hasTableElements()) {
- // CREATE TABLE tbl AS SELECT ...
+
expr.setTableElements(convertSchemaToTableElements(subQuery.getOutSchema()));
}
- // else CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
- storeNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
- if (expr.hasStorageType()) {
- storeNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- } else {
+ // Otherwise, it uses the defined table elements.
+ // ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
+ createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
+ createTableNode.setSchema(convertTableElementsSchema(expr.getTableElements()));
+
+ if (expr.hasStorageType()) { // If storage type (using clause) is specified
+ createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+ } else { // If no specified storage type
// default type
- storeNode.setStorageType(CatalogProtos.StoreType.CSV);
+ createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
}
- if (expr.hasParams()) {
+ if (expr.hasParams()) { // if 'with clause' is specified
Options options = new Options();
options.putAll(expr.getParams());
- storeNode.setOptions(options);
+ createTableNode.setOptions(options);
}
- if (expr.hasPartition()) {
- storeNode.setPartitions(convertTableElementsPartition(context, expr));
+ if (expr.hasPartition()) { // if 'partition by' is specified
+ createTableNode.setPartitions(convertTableElementsPartition(context, expr));
}
- return storeNode;
- } else {
+ return createTableNode;
+
+ } else { // if CREATE AN EMPTY TABLE
Schema tableSchema;
boolean mergedPartition = false;
if (expr.hasPartition()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index d9689aa..7de053b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -26,32 +26,32 @@ import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Projection;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.List;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
@@ -90,8 +90,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
- ShuffleFileWriteNode shuffleFileWriteNode =
- new ShuffleFileWriteNode(UNGENERATED_PID, channel.getTargetId().toString());
+ ShuffleFileWriteNode shuffleFileWriteNode = new ShuffleFileWriteNode(UNGENERATED_PID);
+ shuffleFileWriteNode.setTableName(channel.getTargetId().toString());
shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
shuffleFileWriteNode.setInSchema(plan.getOutSchema());
shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
@@ -116,6 +116,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
return new EvalExprExec(ctx, evalExpr);
+ case CREATE_TABLE:
+ case INSERT:
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
leftExec = createPlanRecursive(ctx, storeNode.getChild());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 66358d5..ae5f4fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -48,7 +48,8 @@ public class PlannerUtil {
baseNode = ((LogicalRootNode) node).getChild();
}
- return baseNode.getType() == NodeType.CREATE_TABLE || baseNode.getType() == NodeType.DROP_TABLE;
+ return (baseNode.getType() == NodeType.CREATE_TABLE && !((CreateTableNode)baseNode).hasSubQuery()) ||
+ baseNode.getType() == NodeType.DROP_TABLE;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
new file mode 100644
index 0000000..77e2a0b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.Insert;
+import org.apache.tajo.algebra.OpType;
+import org.apache.tajo.algebra.Projection;
+
+import java.util.Stack;
+
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationState, Expr> {
+
+ public Expr visitInsert(VerificationState ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+ Expr child = super.visitInsert(ctx, stack, expr);
+
+ if (child != null && child.getType() == OpType.Projection) {
+ if (expr.hasTargetColumns()) {
+ Projection projection = (Projection) child;
+ int projectColumnNum = projection.getNamedExprs().length;
+ int targetColumnNum = expr.getTargetColumns().length;
+
+ if (targetColumnNum > projectColumnNum) {
+ ctx.addVerification("INSERT has more target columns than expressions");
+ } else if (targetColumnNum < projectColumnNum) {
+ ctx.addVerification("INSERT has more expressions than target columns");
+ }
+ }
+ }
+
+ return child;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 58ad73c..341a58a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -389,12 +389,27 @@ public class GlobalPlanner {
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
DataChannel channel;
CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+
if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
Column[] columns = new Column[partitionDesc.getColumns().size()];
- channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+
+ if (currentNode.getType() == NodeType.INSERT) {
+ InsertNode insertNode = (InsertNode) currentNode;
+ channel.setSchema(((InsertNode)currentNode).getProjectedSchema());
+ Column [] shuffleKeys = new Column[partitionDesc.getColumns().size()];
+ int i = 0;
+ for (Column column : partitionDesc.getColumns()) {
+ int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+ shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
+ }
+ channel.setShuffleKeys(shuffleKeys);
+ } else {
+ channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+ }
channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
+
} else {
throw new PlanningException(String.format("Not Supported PartitionsType :%s", partitionsType));
}
@@ -679,13 +694,26 @@ public class GlobalPlanner {
}
@Override
+ public LogicalNode visitCreateTable(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
+ CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+ LogicalNode child = super.visitStoreTable(context, plan, queryBlock, node, stack);
+
+ ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+ ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
+ context.execBlockMap.put(node.getPID(), newExecBlock);
+
+ return node;
+ }
+
+ @Override
public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
InsertNode node, Stack<LogicalNode> stack)
throws PlanningException {
LogicalNode child = super.visitInsert(context, plan, queryBlock, node, stack);
- ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
- execBlock.setPlan(node);
- context.execBlockMap.put(node.getPID(), execBlock);
+
+ ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+ ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
+ context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 3dc5655..66a2355 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -20,36 +20,20 @@ package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
-public class CreateTableNode extends LogicalNode implements Cloneable {
- @Expose private String tableName;
- @Expose private Column[] partitionKeys;
- @Expose private StoreType storageType;
+public class CreateTableNode extends StoreTableNode implements Cloneable {
@Expose private Schema schema;
@Expose private Path path;
- @Expose private Options options;
@Expose private boolean external;
- @Expose private PartitionDesc partitionDesc;
public CreateTableNode(int pid) {
super(pid, NodeType.CREATE_TABLE);
}
- public void setTableName(String name) {
- this.tableName = name;
- }
-
- public final String getTableName() {
- return this.tableName;
- }
-
public void setSchema(Schema schema) {
this.schema = schema;
}
@@ -58,14 +42,6 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
return this.schema;
}
- public void setStorageType(StoreType storageType) {
- this.storageType = storageType;
- }
-
- public StoreType getStorageType() {
- return this.storageType;
- }
-
public boolean hasPath() {
return this.path != null;
}
@@ -77,18 +53,6 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
public Path getPath() {
return this.path;
}
-
- public boolean hasOptions() {
- return this.options != null;
- }
-
- public void setOptions(Options opt) {
- this.options = opt;
- }
-
- public Options getOptions() {
- return this.options;
- }
public boolean isExternal() {
return external;
@@ -98,16 +62,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
this.external = external;
}
- public PartitionDesc getPartitions() {
- return partitionDesc;
- }
-
- public void setPartitions(PartitionDesc partitionDesc) {
- this.partitionDesc = partitionDesc;
- }
-
- public boolean hasPartition() {
- return this.partitionDesc != null;
+ public boolean hasSubQuery() {
+ return child != null;
}
@Override
@@ -120,14 +76,9 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
if (obj instanceof CreateTableNode) {
CreateTableNode other = (CreateTableNode) obj;
return super.equals(other)
- && this.tableName.equals(other.tableName)
&& this.schema.equals(other.schema)
- && this.storageType == other.storageType
&& this.external == other.external
- && TUtil.checkEquals(path, other.path)
- && TUtil.checkEquals(options, other.options)
- && TUtil.checkEquals(partitionKeys, other.partitionKeys)
- && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+ && TUtil.checkEquals(path, other.path);
} else {
return false;
}
@@ -141,29 +92,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
store.storageType = storageType;
store.external = external;
store.path = path != null ? new Path(path.toString()) : null;
- store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
store.options = (Options) (options != null ? options.clone() : null);
- store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
return store;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\"CreateTable\": {\"table\": \""+tableName+"\",");
- if (partitionKeys != null) {
- sb.append("\"partition keys: [");
- for (int i = 0; i < partitionKeys.length; i++) {
- sb.append(partitionKeys[i]);
- if (i < partitionKeys.length - 1)
- sb.append(",");
- }
- sb.append("],");
- }
sb.append("\"schema: \"{" + this.schema).append("}");
sb.append(",\"storeType\": \"" + this.storageType);
sb.append(",\"path\" : \"" + this.path).append("\",");
sb.append(",\"external\" : \"" + this.external).append("\",");
- sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema())
@@ -174,11 +113,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
@Override
public void preOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
+ if (hasSubQuery()) {
+ child.preOrder(visitor);
+ }
+ visitor.visit(this);
}
@Override
public void postOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
+ visitor.visit(this);
+ if (hasSubQuery()) {
+ child.preOrder(visitor);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
index 21f5ef4..d687829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -82,7 +82,6 @@ public class PartitionedTableScanNode extends ScanNode {
sb.append("\n");
}
- sb.append(",");
sb.append("\n \"out schema\": ").append(getOutSchema());
sb.append("\n \"in schema\": ").append(getInSchema());
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
index 2f1a487..1bd3548 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -21,10 +21,11 @@ package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+
/**
* <code>PersistentStoreNode</code> an expression for a persistent data store step.
@@ -32,11 +33,18 @@ import org.apache.tajo.util.TUtil;
*/
public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
@Expose protected String tableName;
- @Expose protected CatalogProtos.StoreType storageType = CatalogProtos.StoreType.CSV;
+ @Expose protected StoreType storageType = StoreType.CSV;
@Expose protected Options options;
- public PersistentStoreNode(int pid, String tableName) {
- super(pid, NodeType.STORE);
+ public PersistentStoreNode(int pid, NodeType nodeType) {
+ super(pid, nodeType);
+ }
+
+ public boolean hasTargetTable() {
+ return tableName != null;
+ }
+
+ public void setTableName(String tableName) {
this.tableName = tableName;
}
@@ -44,11 +52,11 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
return this.tableName;
}
- public void setStorageType(CatalogProtos.StoreType storageType) {
+ public void setStorageType(StoreType storageType) {
this.storageType = storageType;
}
- public CatalogProtos.StoreType getStorageType() {
+ public StoreType getStorageType() {
return this.storageType;
}
@@ -60,6 +68,10 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
return this.options;
}
+ public void setOptions(Options options) {
+ this.options = options;
+ }
+
@Override
public PlanString getPlanString() {
PlanString planStr = new PlanString("Store");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
index 180b1a2..3abec9d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -34,8 +34,8 @@ public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneab
@Expose private int numOutputs;
@Expose private Column [] shuffleKeys;
- public ShuffleFileWriteNode(int pid, String tableName) {
- super(pid, tableName);
+ public ShuffleFileWriteNode(int pid) {
+ super(pid, NodeType.STORE);
}
public final int getNumOutputs() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
index c3d7307..fb8ae3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.logical;
public class StoreIndexNode extends StoreTableNode {
- public StoreIndexNode(int pid, String tableName) {
- super(pid, tableName);
+ public StoreIndexNode(int pid) {
+ super(pid);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 843a70f..2aca6a7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -19,45 +19,23 @@
package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-
public class StoreTableNode extends PersistentStoreNode implements Cloneable {
- @Expose private boolean isCreatedTable = false;
- @Expose private boolean isOverwritten = false;
@Expose private PartitionDesc partitionDesc;
- public StoreTableNode(int pid, String tableName) {
- super(pid, tableName);
- }
-
- public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
- super(pid, tableName);
- this.partitionDesc = partitionDesc;
- }
-
- public void setStorageType(StoreType storageType) {
- this.storageType = storageType;
- }
-
- public StoreType getStorageType() {
- return this.storageType;
- }
-
- public boolean hasOptions() {
- return this.options != null;
+ public StoreTableNode(int pid) {
+ super(pid, NodeType.STORE);
}
- public void setOptions(Options options) {
- this.options = options;
+ protected StoreTableNode(int pid, NodeType nodeType) {
+ super(pid, nodeType);
}
- public Options getOptions() {
- return this.options;
+ public boolean hasPartition() {
+ return this.partitionDesc != null;
}
public PartitionDesc getPartitions() {
@@ -76,22 +54,12 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
return planStr;
}
-
- public boolean isCreatedTable() {
- return isCreatedTable;
- }
-
- public void setCreateTable() {
- isCreatedTable = true;
- }
@Override
public boolean equals(Object obj) {
if (obj instanceof StoreTableNode) {
StoreTableNode other = (StoreTableNode) obj;
boolean eq = super.equals(other);
- eq = eq && isCreatedTable == other.isCreatedTable;
- eq = eq && isOverwritten == other.isOverwritten;
eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
return eq;
} else {
@@ -102,8 +70,6 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
StoreTableNode store = (StoreTableNode) super.clone();
- store.isCreatedTable = isCreatedTable;
- store.isOverwritten = isOverwritten;
store.partitionDesc = partitionDesc;
return store;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 8817c41..0b06e9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
+import org.apache.tajo.util.TUtil;
public abstract class UnaryNode extends LogicalNode implements Cloneable {
@@ -43,7 +44,7 @@ public abstract class UnaryNode extends LogicalNode implements Cloneable {
public boolean deepEquals(Object o) {
if (o instanceof UnaryNode) {
UnaryNode u = (UnaryNode) o;
- return equals(o) && child.deepEquals(u.child);
+ return equals(o) && TUtil.checkEquals(child, u.child);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index c873b93..338d2f2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -34,6 +34,8 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.Appender;
@@ -83,21 +85,21 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
}
// Find column index to name subpartition directory path
- if (this.plan.getPartitions() != null) {
- if (this.plan.getPartitions().getColumns() != null) {
- partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
- partitionColumnNames = new String[partitionColumnIndices.length];
- Schema columnPartitionSchema = plan.getPartitions().getSchema();
- for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++) {
- Column targetColumn = columnPartitionSchema.getColumn(i);
- for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
- Column inputColumn = plan.getInSchema().getColumn(j);
- if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
- partitionColumnIndices[i] = j;
- partitionColumnNames[i] = targetColumn.getColumnName();
- }
- }
- }
+ int partitionKeyNum = this.plan.getPartitions().getColumns().size();
+ partitionColumnIndices = new int[partitionKeyNum];
+ partitionColumnNames = new String[partitionKeyNum];
+ for (int i = 0; i < partitionKeyNum; i++) {
+ Column column = this.plan.getPartitions().getColumns().get(i);
+ partitionColumnNames[i] = column.getColumnName();
+
+ if (this.plan.getType() == NodeType.INSERT) {
+ InsertNode insertNode = ((InsertNode)plan);
+ int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+ partitionColumnIndices[i] = idx;
+ } else {
+ // We can get partition column from a logical schema.
+ // Don't use output schema because it is rewritten.
+ partitionColumnIndices[i] = plan.getOutSchema().getColumnId(column.getQualifiedName());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index affdc86..f097d0c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -23,6 +23,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.InsertNode;
import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
@@ -54,8 +55,14 @@ public class StoreTableExec extends UnaryPhysicalExec {
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
- context.getOutputPath());
+ if (plan instanceof InsertNode) {
+ InsertNode createTableNode = (InsertNode) plan;
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+ createTableNode.getTableSchema(), context.getOutputPath());
+ } else {
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
+ context.getOutputPath());
+ }
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 86599f3..6264961 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -61,14 +61,7 @@ public class ProjectionPushDownRule extends
public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
- LogicalPlan.QueryBlock topmostBlock;
-
- // skip a non-table-expression block.
- if (plan.getRootBlock().getRootType() == NodeType.INSERT) {
- topmostBlock = plan.getChildBlocks(rootBlock).get(0);
- } else {
- topmostBlock = rootBlock;
- }
+ LogicalPlan.QueryBlock topmostBlock = rootBlock;
Stack<LogicalNode> stack = new Stack<LogicalNode>();
Context context = new Context(plan);
@@ -98,6 +91,11 @@ public class ProjectionPushDownRule extends
}
private String add(String name, EvalNode evalNode) throws PlanningException {
+ if (evalNode.getType() == EvalType.CONST) {
+ nameToEvalMap.put(name, evalNode);
+ resolvedFlags.put(name, false);
+ return name;
+ }
if (evalToNameMap.containsKey(evalNode)) {
name = evalToNameMap.get(evalNode);
} else {
@@ -178,6 +176,9 @@ public class ProjectionPushDownRule extends
public void resolve(Target target) {
EvalNode evalNode = target.getEvalTree();
+ if (evalNode.getType() == EvalType.CONST) { // if constant value
+ return; // keep it raw always
+ }
if (!evalToNameMap.containsKey(evalNode)) {
throw new RuntimeException("No such eval: " + evalNode);
}
@@ -327,7 +328,13 @@ public class ProjectionPushDownRule extends
case INSERT:
InsertNode insertNode = (InsertNode) parentNode;
insertNode.setSubQuery(child);
-
+ break;
+ case CREATE_TABLE:
+ CreateTableNode createTableNode = (CreateTableNode) parentNode;
+ createTableNode.setChild(child);
+ createTableNode.setInSchema(child.getOutSchema());
+ createTableNode.setInSchema(child.getOutSchema());
+ break;
default:
throw new PlanningException("Unexpected Parent Node: " + parentNode.getType());
}
@@ -743,4 +750,13 @@ public class ProjectionPushDownRule extends
return node;
}
+
+ @Override
+ public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
+ Stack<LogicalNode> stack) throws PlanningException {
+ stack.push(node);
+ visit(context, plan, block, node.getChild(), stack);
+ stack.pop();
+ return node;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c40810..cfe3b61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -33,12 +33,8 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
import org.apache.tajo.catalog.exception.NoSuchTableException;
import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.FieldEval;
import org.apache.tajo.engine.exception.IllegalQueryStatusException;
import org.apache.tajo.engine.exception.VerifyException;
import org.apache.tajo.engine.parser.HiveConverter;
@@ -56,6 +52,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Stack;
import static org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
@@ -69,9 +66,10 @@ public class GlobalEngine extends AbstractService {
private SQLAnalyzer analyzer;
private HiveConverter converter;
private CatalogService catalog;
+ private PreLogicalPlanVerifier preVerifier;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
- private LogicalPlanVerifier verifier;
+ private LogicalPlanVerifier annotatedPlanVerifier;
private DistributedQueryHookManager hookManager;
public GlobalEngine(final MasterContext context) {
@@ -85,9 +83,10 @@ public class GlobalEngine extends AbstractService {
try {
analyzer = new SQLAnalyzer();
converter = new HiveConverter();
+ preVerifier = new PreLogicalPlanVerifier();
planner = new LogicalPlanner(context.getCatalog());
optimizer = new LogicalOptimizer(context.getConf());
- verifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
+ annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
hookManager = new DistributedQueryHookManager();
hookManager.addHook(new CreateTableHook());
@@ -134,7 +133,6 @@ public class GlobalEngine extends AbstractService {
context.getSystemMetrics().counter("Query", "totalQuery").inc();
Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
-
LogicalPlan plan = createLogicalPlan(planningContext);
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
@@ -234,14 +232,23 @@ public class GlobalEngine extends AbstractService {
private LogicalPlan createLogicalPlan(Expr expression) throws PlanningException {
+ VerificationState state = new VerificationState();
+ preVerifier.visit(state, new Stack<Expr>(), expression);
+ if (!state.verified()) {
+ StringBuilder sb = new StringBuilder();
+ for (String error : state.getErrorMessages()) {
+ sb.append(error).append("\n");
+ }
+ throw new VerifyException(sb.toString());
+ }
+
LogicalPlan plan = planner.createPlan(expression);
optimizer.optimize(plan);
if (LOG.isDebugEnabled()) {
LOG.debug("LogicalPlan:\n" + plan.getRootBlock().getRoot());
}
- VerificationState state = new VerificationState();
- verifier.visit(state, plan, plan.getRootBlock());
+ annotatedPlanVerifier.visit(state, plan, plan.getRootBlock());
if (!state.verified()) {
StringBuilder sb = new StringBuilder();
@@ -371,23 +378,18 @@ public class GlobalEngine extends AbstractService {
@Override
public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- if (rootNode.getChild().getType() == NodeType.STORE) {
- StoreTableNode storeTableNode = rootNode.getChild();
- return storeTableNode.isCreatedTable();
- } else {
- return false;
- }
+ return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
}
@Override
public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- StoreTableNode storeTableNode = rootNode.getChild();
- String tableName = storeTableNode.getTableName();
+ CreateTableNode createTableNode = rootNode.getChild();
+ String tableName = createTableNode.getTableName();
queryContext.setOutputTable(tableName);
queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
- if(storeTableNode.getPartitions() != null) {
- queryContext.setPartitions(storeTableNode.getPartitions());
+ if(createTableNode.getPartitions() != null) {
+ queryContext.setPartitions(createTableNode.getPartitions());
}
queryContext.setCreateTable();
}
@@ -405,122 +407,24 @@ public class GlobalEngine extends AbstractService {
queryContext.setInsert();
InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
- StoreTableNode storeNode;
// Set QueryContext settings, such as output table name and output path.
// It also remove data files if overwrite is true.
String outputTableName;
Path outputPath;
- CatalogProtos.StoreType storeType;
- Options options = new Options();
if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
- TableDesc desc = insertNode.getTargetTable();
- outputTableName = desc.getName();
- outputPath = desc.getPath();
- queryContext.setOutputTable(outputTableName);
-
- // set default values
- options.putAll(desc.getMeta().getOptions());
- storeType = desc.getMeta().getStoreType();
+ queryContext.setOutputTable(insertNode.getTableName());
+ queryContext.setOutputPath(insertNode.getPath());
} else { // INSERT INTO LOCATION ...
- outputTableName = PlannerUtil.normalizeTableName(insertNode.getPath().getName());
+ // When INSERT INTO LOCATION, must not set output table.
outputPath = insertNode.getPath();
queryContext.setFileOutput();
-
- // set default values
- options = new Options();
- storeType = CatalogProtos.StoreType.CSV;
+ queryContext.setOutputPath(outputPath);
}
- // overwrite the store type if store type is specified in the query statement
- if (insertNode.hasStorageType()) {
- storeType = insertNode.getStorageType();
- }
-
- // overwrite the table properties if they are specified in the query statement
- if (insertNode.hasOptions()) {
- options.putAll(insertNode.getOptions());
- }
-
- storeNode = new StoreTableNode(plan.newPID(), outputTableName);
- storeNode.setStorageType(storeType);
- storeNode.setOptions(options);
-
- // set OutputPath
- queryContext.setOutputPath(outputPath);
-
if (insertNode.isOverwrite()) {
queryContext.setOutputOverwrite();
}
-
- ////////////////////////////////////////////////////////////////////////////////////
- // [TARGET TABLE] [TARGET COLUMN] [SUBQUERY Schema] /
- // INSERT INTO TB_NAME (col1, col2) SELECT c1, c2 FROM ... /
- ////////////////////////////////////////////////////////////////////////////////////
- LogicalNode subQuery = insertNode.getSubQuery();
- Schema subQueryOutSchema = subQuery.getOutSchema();
-
- if (insertNode.hasTargetTable()) { // if a target table is given, it computes the proper schema.
- storeNode.getOptions().putAll(insertNode.getTargetTable().getMeta().toMap());
-
- Schema targetTableSchema = insertNode.getTargetTable().getSchema();
- Schema targetProjectedSchema = insertNode.getTargetSchema();
-
- int [] targetColumnIds = new int[targetProjectedSchema.getColumnNum()];
- int idx = 0;
- for (Column column : targetProjectedSchema.getColumns()) {
- targetColumnIds[idx++] = targetTableSchema.getColumnId(column.getQualifiedName());
- }
-
- Target [] targets = new Target[targetTableSchema.getColumnNum()];
- boolean matched = false;
- for (int i = 0; i < targetTableSchema.getColumnNum(); i++) {
- Column column = targetTableSchema.getColumn(i);
- for (int j = 0; j < targetColumnIds.length; j++) {
- if (targetColumnIds[j] == i) {
- Column outputColumn = subQueryOutSchema.getColumn(j);
- targets[i] = new Target(new FieldEval(outputColumn), column.getColumnName());
- matched = true;
- break;
- }
- }
- if (!matched) {
- targets[i] = new Target(new ConstEval(NullDatum.get()), column.getColumnName());
- }
- matched = false;
- }
-
-
- ProjectionNode projectionNode = new ProjectionNode(plan.newPID());
- projectionNode.setTargets(targets);
- projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
- List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());
- projectionNode.setChild(blocks.get(0).getRoot());
-
- storeNode.setOutSchema(projectionNode.getOutSchema());
- storeNode.setInSchema(projectionNode.getOutSchema());
- storeNode.setChild(projectionNode);
- } else {
- storeNode.setOutSchema(subQueryOutSchema);
- storeNode.setInSchema(subQueryOutSchema);
- List<LogicalPlan.QueryBlock> childBlocks = plan.getChildBlocks(plan.getRootBlock());
- storeNode.setChild(childBlocks.get(0).getRoot());
- }
-
- // If InsertNode contains table partition information, StoreNode must has it.
- if (insertNode.hasTargetTable()) {
- if (insertNode.getTargetTable().getPartitions() != null) {
- storeNode.setPartitions(insertNode.getTargetTable().getPartitions());
- }
- }
-
- // find a subquery query of insert node and merge root block and subquery into one query block.
- PlannerUtil.replaceNode(plan.getRootBlock().getRoot(), storeNode, NodeType.INSERT);
- plan.getRootBlock().refresh();
- LogicalPlan.QueryBlock subBlock = plan.getBlock(insertNode.getSubQuery());
- // remove the sub block and connection from a block graph.
- plan.removeBlock(subBlock);
- plan.getQueryBlockGraph().removeEdge(subBlock.getName(), LogicalPlan.ROOT_BLOCK);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 22c3a35..7e4540b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -35,12 +35,15 @@ import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -327,7 +330,7 @@ public class Query implements EventHandler<QueryEvent> {
if (queryContext.hasOutputTable()) { // TRUE only if a query command is 'CREATE TABLE' OR 'INSERT INTO'
if (queryContext.isOutputOverwrite()) { // TRUE only if a query is 'INSERT OVERWRITE INTO'
- catalog.deleteTable(finalOutputDir.getName());
+ catalog.deleteTable(finalTableDesc.getName());
}
catalog.addTable(finalTableDesc);
}
@@ -378,12 +381,25 @@ public class Query implements EventHandler<QueryEvent> {
Path finalOutputDir) {
// Determine the output table name
SubQuery subQuery = query.getSubQuery(finalExecBlockId);
- QueryContext queryContext = query.context.getQueryContext();
+
String outputTableName;
- if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
- outputTableName = queryContext.getOutputTable();
- } else { // SELECT STATEMENT
- outputTableName = query.getId().toString();
+ PartitionDesc partitionDesc = null;
+ QueryContext queryContext = query.context.getQueryContext();
+ if (subQuery.getBlock().getPlan().getType() == NodeType.CREATE_TABLE) {
+ CreateTableNode createTableNode = (CreateTableNode) subQuery.getBlock().getPlan();
+ outputTableName = createTableNode.getTableName();
+ if (createTableNode.hasPartition()) {
+ partitionDesc = createTableNode.getPartitions();
+ }
+ } else {
+ if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+ outputTableName = queryContext.getOutputTable();
+ } else { // SELECT STATEMENT
+ outputTableName = query.getId().toString();
+ }
+ if(queryContext.hasPartitions()) {
+ partitionDesc = queryContext.getPartitions();
+ }
}
TableMeta meta = subQuery.getTableMeta();
@@ -410,8 +426,8 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- if(queryContext.hasPartitions()) {
- finalTableDesc.setPartitions(queryContext.getPartitions());
+ if (partitionDesc != null) {
+ finalTableDesc.setPartitions(partitionDesc);
}
return finalTableDesc;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index cd93a8f..6758189 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -424,7 +424,7 @@ public class TestLogicalPlanner {
assertEquals(NodeType.ROOT, plan.getType());
LogicalRootNode root = (LogicalRootNode) plan;
- assertEquals(NodeType.STORE, root.getChild().getType());
+ assertEquals(NodeType.CREATE_TABLE, root.getChild().getType());
StoreTableNode storeNode = root.getChild();
testQuery7(storeNode.getChild());
}
@@ -776,31 +776,31 @@ public class TestLogicalPlanner {
public final void testInsertInto0() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[0]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertFalse(insertNode.isOverwrite());
assertTrue(insertNode.hasTargetTable());
- assertEquals("score", insertNode.getTargetTable().getName());
+ assertEquals("score", insertNode.getTableName());
}
@Test
public final void testInsertInto1() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[1]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertFalse(insertNode.isOverwrite());
- assertEquals("score", insertNode.getTargetTable().getName());
+ assertEquals("score", insertNode.getTableName());
}
@Test
public final void testInsertInto2() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[2]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertFalse(insertNode.isOverwrite());
- assertEquals("employee", insertNode.getTargetTable().getName());
+ assertEquals("employee", insertNode.getTableName());
assertTrue(insertNode.hasTargetSchema());
assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
@@ -810,7 +810,7 @@ public class TestLogicalPlanner {
public final void testInsertInto3() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[3]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertFalse(insertNode.isOverwrite());
assertTrue(insertNode.hasPath());
@@ -820,11 +820,11 @@ public class TestLogicalPlanner {
public final void testInsertInto4() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[4]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertTrue(insertNode.isOverwrite());
assertTrue(insertNode.hasTargetTable());
- assertEquals("employee", insertNode.getTargetTable().getName());
+ assertEquals("employee", insertNode.getTableName());
assertTrue(insertNode.hasTargetSchema());
assertEquals(insertNode.getTargetSchema().getColumn(0).getColumnName(), "name");
assertEquals(insertNode.getTargetSchema().getColumn(1).getColumnName(), "deptname");
@@ -834,7 +834,7 @@ public class TestLogicalPlanner {
public final void testInsertInto5() throws PlanningException {
Expr expr = sqlAnalyzer.parse(insertStatements[5]);
LogicalPlan plan = planner.createPlan(expr);
- assertEquals(2, plan.getQueryBlocks().size());
+ assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertTrue(insertNode.isOverwrite());
assertTrue(insertNode.hasPath());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d06dd852/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index b770de5..8b2db9f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -94,13 +94,13 @@ public class TestInsertQuery {
assertTrue(catalog.existsTable(tableName));
TableDesc originalDesc = catalog.getTableDesc(tableName);
- res = tpch.execute("insert overwrite into " + tableName
- + " (col1, col3) select l_orderkey, l_quantity from lineitem");
+ res = tpch.execute("insert overwrite into " + tableName + " (col1, col3) select l_orderkey, l_quantity from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(tableName);
assertEquals(5, desc.getStats().getNumRows().intValue());
res = tpch.execute("select * from " + tableName);
+
assertTrue(res.next());
assertEquals(1, res.getLong(1));
assertTrue(0f == res.getFloat(2));