You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/31 09:50:59 UTC
[07/10] git commit: TAJO-460: CTAS statement should support
partitioned table. (Min Zhou via hyunsik)
TAJO-460: CTAS statement should support partitioned table. (Min Zhou via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d39bb998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d39bb998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d39bb998
Branch: refs/heads/DAG-execplan
Commit: d39bb99809384c25214f4847c8de06e9ac012e98
Parents: 2a41930
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Dec 30 12:41:09 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 30 12:41:09 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../tajo/catalog/partition/PartitionDesc.java | 4 +
.../tajo/engine/planner/LogicalPlanner.java | 10 ++
.../apache/tajo/engine/query/QueryContext.java | 14 +++
.../org/apache/tajo/master/GlobalEngine.java | 3 +
.../apache/tajo/master/querymaster/Query.java | 5 +
.../apache/tajo/engine/query/TestCTASQuery.java | 97 ++++++++++++++++++++
.../create_partitioned_table_as_select.sql | 6 ++
8 files changed, 142 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac27e63..fae9d1d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.8.0 - unreleased
NEW FEATURES
+ TAJO-460: CTAS statement should support partitioned table.
+ (Min Zhou via hyunsik)
+
TAJO-381: Implement find_in_set function. (Jae Young Lee via hyunsik)
TAJO-439: Time literal support. (DaeMyung Kang via jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
index fbec807..792d642 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -191,4 +191,8 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
return CatalogGsonHelper.toJson(this, PartitionDesc.class);
}
+
+ public static PartitionDesc fromJson(String strVal) {
+ return strVal != null ? CatalogGsonHelper.fromJson(strVal, PartitionDesc.class) : null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index fea3361..04bc27b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -759,9 +759,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
storeNode.setChild(subQuery);
if (expr.hasTableElements()) {
+ // CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
Schema schema = convertTableElementsSchema(expr.getTableElements());
storeNode.setOutSchema(schema);
} else {
+ // CREATE TABLE tbl AS SELECT ...
storeNode.setOutSchema(subQuery.getOutSchema());
}
storeNode.setInSchema(subQuery.getOutSchema());
@@ -779,6 +781,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
storeNode.setOptions(options);
}
+ if (expr.hasPartition()) {
+ storeNode.setPartitions(convertTableElementsPartition(context, expr));
+ }
+
return storeNode;
} else {
Schema tableSchema;
@@ -1017,12 +1023,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
Schema targetSchema = new Schema();
if (expr.hasTargetColumns()) {
+ // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
String [] targetColumnNames = expr.getTargetColumns();
for (int i = 0; i < targetColumnNames.length; i++) {
Column targetColumn = context.plan.resolveColumn(context.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
targetSchema.addColumn(targetColumn);
}
} else {
+ // use the output schema of select clause as target schema
+ // if didn't specific target columns like the way below,
+ // INSERT OVERWRITE INTO TABLE tbl SELECT ...
Schema targetTableSchema = desc.getSchema();
for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
targetSchema.addColumn(targetTableSchema.getColumn(i));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 26fd8c0..e710f9d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.NodeType;
@@ -36,6 +37,7 @@ public class QueryContext extends Options {
public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
+ public static final String OUTPUT_PARTITIONS = "tajo.query.output.partitions";
public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
@@ -128,6 +130,18 @@ public class QueryContext extends Options {
return strVal != null ? new Path(strVal) : null;
}
+ public boolean hasPartitions() {
+ return get(OUTPUT_PARTITIONS) != null;
+ }
+
+ public void setPartitions(PartitionDesc partitionDesc) {
+ put(OUTPUT_PARTITIONS, partitionDesc != null ? partitionDesc.toJson() : null);
+ }
+
+ public PartitionDesc getPartitions() {
+ return PartitionDesc.fromJson(get(OUTPUT_PARTITIONS));
+ }
+
public void setOutputOverwrite() {
setBool(OUTPUT_OVERWRITE, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index d17ec91..7336f2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -371,6 +371,9 @@ public class GlobalEngine extends AbstractService {
String tableName = storeTableNode.getTableName();
queryContext.setOutputTable(tableName);
queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
+ if(storeTableNode.getPartitions() != null) {
+ queryContext.setPartitions(storeTableNode.getPartitions());
+ }
queryContext.setCreateTable();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 79ae34d..f8c335b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -410,6 +410,11 @@ public class Query implements EventHandler<QueryEvent> {
finalTableDesc = updatingTable;
}
}
+
+ if(queryContext.hasPartitions()) {
+ finalTableDesc.setPartitions(queryContext.getPartitions());
+ }
+
return finalTableDesc;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
new file mode 100644
index 0000000..1ff3b97
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test CREATE TABLE AS SELECT statements
+ */
+@Category(IntegrationTest.class)
+public class TestCTASQuery {
+ private static TpchTestBase tpch;
+ public TestCTASQuery() throws IOException {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ tpch = TpchTestBase.getInstance();
+ }
+
+ @Test
+ public final void testCtasWithColumnedPartition() throws Exception {
+ String tableName ="testCtasWithColumnedPartition";
+ tpch.execute(
+ "create table " + tableName
+ + " (col1 int4, col2 int4) partition by column(key float8) "
+ + " as select l_orderkey as col1, l_partkey as col2, l_quantity as key from lineitem");
+
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertTrue(catalog.existsTable(tableName));
+ PartitionDesc partitionDesc = desc.getPartitions();
+ assertEquals(partitionDesc.getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
+ assertEquals("key", partitionDesc.getColumns().get(0).getColumnName());
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ Path path = desc.getPath();
+ assertTrue(fs.isDirectory(path));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ ResultSet res = tpch.execute(
+ "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0)");
+
+ Map<Double, int []> resultRows1 = Maps.newHashMap();
+ resultRows1.put(45.0d, new int[]{3, 2});
+ resultRows1.put(38.0d, new int[]{2, 2});
+
+ for (int i = 0; i < 3 && res.next(); i++) {
+ assertEquals(resultRows1.get(res.getDouble(3))[0], res.getInt(1));
+ assertEquals(resultRows1.get(res.getDouble(3))[1], res.getInt(2));
+ }
+ res.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql b/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
new file mode 100644
index 0000000..09b14eb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
@@ -0,0 +1,6 @@
+CREATE TABLE sales ( col1 int, col2 int)
+PARTITION BY COLUMN (col3 int, col4 float, col5 text)
+AS SELECT col1, col2, col3, col4, col5 FROM sales_src
+ WHERE col1 > 16
+
+