You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/31 09:50:59 UTC

[07/10] git commit: TAJO-460: CTAS statement should support partitioned table. (Min Zhou via hyunsik)

TAJO-460: CTAS statement should support partitioned table.  (Min Zhou via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/d39bb998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/d39bb998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/d39bb998

Branch: refs/heads/DAG-execplan
Commit: d39bb99809384c25214f4847c8de06e9ac012e98
Parents: 2a41930
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Dec 30 12:41:09 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 30 12:41:09 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../tajo/catalog/partition/PartitionDesc.java   |  4 +
 .../tajo/engine/planner/LogicalPlanner.java     | 10 ++
 .../apache/tajo/engine/query/QueryContext.java  | 14 +++
 .../org/apache/tajo/master/GlobalEngine.java    |  3 +
 .../apache/tajo/master/querymaster/Query.java   |  5 +
 .../apache/tajo/engine/query/TestCTASQuery.java | 97 ++++++++++++++++++++
 .../create_partitioned_table_as_select.sql      |  6 ++
 8 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac27e63..fae9d1d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.8.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-460: CTAS statement should support partitioned table.
+    (Min Zhou via hyunsik)
+
     TAJO-381: Implement find_in_set function. (Jae Young Lee via hyunsik)
 
     TAJO-439: Time literal support. (DaeMyung Kang via jihoon)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
index fbec807..792d642 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -191,4 +191,8 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
     return CatalogGsonHelper.toJson(this, PartitionDesc.class);
 
   }
+
+  public static PartitionDesc fromJson(String strVal) {
+    return strVal != null ? CatalogGsonHelper.fromJson(strVal, PartitionDesc.class) : null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index fea3361..04bc27b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -759,9 +759,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       storeNode.setChild(subQuery);
 
       if (expr.hasTableElements()) {
+        // CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
         Schema schema = convertTableElementsSchema(expr.getTableElements());
         storeNode.setOutSchema(schema);
       } else {
+        // CREATE TABLE tbl AS SELECT ...
         storeNode.setOutSchema(subQuery.getOutSchema());
       }
       storeNode.setInSchema(subQuery.getOutSchema());
@@ -779,6 +781,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         storeNode.setOptions(options);
       }
 
+      if (expr.hasPartition()) {
+        storeNode.setPartitions(convertTableElementsPartition(context, expr));
+      }
+
       return storeNode;
     } else {
       Schema tableSchema;
@@ -1017,12 +1023,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
       Schema targetSchema = new Schema();
       if (expr.hasTargetColumns()) {
+        // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
         String [] targetColumnNames = expr.getTargetColumns();
         for (int i = 0; i < targetColumnNames.length; i++) {
           Column targetColumn = context.plan.resolveColumn(context.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
           targetSchema.addColumn(targetColumn);
         }
       } else {
+        // use the output schema of select clause as target schema
+        // if didn't specific target columns like the way below,
+        // INSERT OVERWRITE INTO TABLE tbl SELECT ...
         Schema targetTableSchema = desc.getSchema();
         for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
           targetSchema.addColumn(targetTableSchema.getColumn(i));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 26fd8c0..e710f9d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.NodeType;
@@ -36,6 +37,7 @@ public class QueryContext extends Options {
 
   public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
   public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
+  public static final String OUTPUT_PARTITIONS = "tajo.query.output.partitions";
   public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
   public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
 
@@ -128,6 +130,18 @@ public class QueryContext extends Options {
     return strVal != null ? new Path(strVal) : null;
   }
 
+  public boolean hasPartitions() {
+    return get(OUTPUT_PARTITIONS) != null;
+  }
+
+  public void setPartitions(PartitionDesc partitionDesc) {
+    put(OUTPUT_PARTITIONS, partitionDesc != null ? partitionDesc.toJson() : null);
+  }
+
+  public PartitionDesc getPartitions() {
+    return PartitionDesc.fromJson(get(OUTPUT_PARTITIONS));
+  }
+
   public void setOutputOverwrite() {
     setBool(OUTPUT_OVERWRITE, true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index d17ec91..7336f2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -371,6 +371,9 @@ public class GlobalEngine extends AbstractService {
       String tableName = storeTableNode.getTableName();
       queryContext.setOutputTable(tableName);
       queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
+      if(storeTableNode.getPartitions() != null) {
+        queryContext.setPartitions(storeTableNode.getPartitions());
+      }
       queryContext.setCreateTable();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 79ae34d..f8c335b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -410,6 +410,11 @@ public class Query implements EventHandler<QueryEvent> {
           finalTableDesc = updatingTable;
         }
       }
+
+      if(queryContext.hasPartitions()) {
+        finalTableDesc.setPartitions(queryContext.getPartitions());
+      }
+
       return finalTableDesc;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
new file mode 100644
index 0000000..1ff3b97
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test CREATE TABLE AS SELECT statements
+ */
+@Category(IntegrationTest.class)
+public class TestCTASQuery {
+  private static TpchTestBase tpch;
+  public TestCTASQuery() throws IOException {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+  }
+
+  @Test
+  public final void testCtasWithColumnedPartition() throws Exception {
+    String tableName ="testCtasWithColumnedPartition";
+    tpch.execute(
+        "create table " + tableName
+        + " (col1 int4, col2 int4) partition by column(key float8) "
+        + " as select l_orderkey as col1, l_partkey as col2, l_quantity as key from lineitem");
+
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertTrue(catalog.existsTable(tableName));
+    PartitionDesc partitionDesc = desc.getPartitions();
+    assertEquals(partitionDesc.getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
+    assertEquals("key", partitionDesc.getColumns().get(0).getColumnName());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    ResultSet res = tpch.execute(
+        "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0)");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      assertEquals(resultRows1.get(res.getDouble(3))[0], res.getInt(1));
+      assertEquals(resultRows1.get(res.getDouble(3))[1], res.getInt(2));
+    }
+    res.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d39bb998/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql b/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
new file mode 100644
index 0000000..09b14eb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/queries/create_partitioned_table_as_select.sql
@@ -0,0 +1,6 @@
+CREATE TABLE sales ( col1 int, col2 int)
+PARTITION BY COLUMN (col3 int, col4 float, col5 text)
+AS SELECT col1, col2, col3, col4, col5 FROM sales_src
+   WHERE col1 > 16
+
+