You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/08 09:48:14 UTC

[02/14] incubator-carbondata git commit: add partition validate & example

add partition validate & example


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/28e7981d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/28e7981d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/28e7981d

Branch: refs/heads/12-dev
Commit: 28e7981d3dea7570532d50c614e15fafbf98e650
Parents: c95b311
Author: lionelcao <wh...@gmail.com>
Authored: Tue Apr 25 17:49:18 2017 +0800
Committer: lionelcao <wh...@gmail.com>
Committed: Thu May 4 14:53:09 2017 +0800

----------------------------------------------------------------------
 .../src/main/resources/partition_data.csv       | 25 ++++++
 .../examples/CarbonPartitionExample.scala       | 91 ++++++++++++++++++++
 .../carbondata/spark/util/CommonUtil.scala      | 34 ++++++++
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 20 +++--
 4 files changed, 163 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/28e7981d/examples/spark2/src/main/resources/partition_data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/partition_data.csv b/examples/spark2/src/main/resources/partition_data.csv
new file mode 100644
index 0000000..2d521b7
--- /dev/null
+++ b/examples/spark2/src/main/resources/partition_data.csv
@@ -0,0 +1,25 @@
+vin,logdate,phonenumber,country,area
+A42158424831,2016/02/12,125371341,China,Asia
+A42158473831,2016/01/12,125371342,China,Asia
+A42152474832,2016/02/12,125371343,US,America
+A42151477823,2016/12/12,125371344,China,Asia
+A42158474135,2016/02/15,125371345,Japan,Asia
+A42258434831,2016/12/12,125371346,China,Asia
+A42158475831,2016/05/12,125371347,UK,Europe
+A41158494830,2015/07/12,225371348,China,Asia
+A42158424831,2015/02/12,225371349,China,Asia
+A42158473830,2014/01/12,225371310,China,Asia
+A42152474830,2013/02/12,325371311,US,America
+A42151477823,2012/12/12,425371312,China,Asia
+A42158474133,2012/02/15,325371313,Japan,Asia
+A42258434835,2013/12/12,525371314,China,Asia
+A42158475836,2014/05/12,625371315,UK,Europe
+A41158494838,2015/07/12,525371316,China,Asia
+A42158424833,2016/02/12,425371317,China,Asia
+A42158473832,2017/01/12,325371318,China,Asia
+A42152474834,2011/02/12,225371319,US,America
+A42151477824,2012/12/12,225371320,China,Asia
+A42158474137,2013/02/15,325371321,Japan,Asia
+A42258434837,2014/12/12,25371322,China,Asia
+A42158475838,2014/05/12,425371323,UK,Europe
+A41158494839,2016/07/12,625371324,China,Asia

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/28e7981d/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
new file mode 100644
index 0000000..96c1c72
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonPartitionExample {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val testData = s"$rootPath/examples/spark2/src/main/resources/partition_data.csv"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonPartitionExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    spark.sql("DROP TABLE IF EXISTS non_partition_table")
+
+    spark.sql("""
+                | CREATE TABLE IF NOT EXISTS non_partition_table
+                | (
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | area String
+                | )
+                | STORED BY 'carbondata'
+              """.stripMargin)
+
+    spark.sql("DROP TABLE IF EXISTS partition_table")
+
+    spark.sql("""
+       | CREATE TABLE IF NOT EXISTS partition_table
+       | (
+       | vin String,
+       | logdate Timestamp,
+       | phonenumber Long,
+       | country String,
+       | area String
+       | )
+       | PARTITIONED BY (vin String)
+       | STORED BY 'carbondata'
+       | TBLPROPERTIES('PARTITIONING'='HASH','PARTITIONCOUNT'='5')
+       """.stripMargin)
+
+    //spark.sql(s"""
+    //   LOAD DATA LOCAL INPATH '$testData' into table rx5_tbox_parquet_all options('BAD_RECORDS_ACTION'='FORCE')
+    //   """)
+
+    //spark.sql("select vin, count(*) from rx5_tbox_parquet_all group by vin order by count(*) desc").show(50)
+
+    // Drop table
+    //spark.sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/28e7981d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 679a4e7..774261d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
+import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -144,6 +145,39 @@ object CommonUtil {
     isValid
   }
 
+  /**
+   * 1. If partition by clause exists, then partitioning should be defined
+   * 2. If partitioning is Hash, then partitioncount should be defined
+   * 3. If partitioning is List, then value list should be defined
+   * 4. If partitioning is Range, then boundary value list should be defined
+   * 5. Only support single level partition for now
+   * @param tableProperties
+   * @param partitionCols
+   * @return
+   */
+  def validatePartitionColumns(tableProperties: Map[String, String],
+      partitionCols: Seq[StructField]): Boolean = {
+    var isValid: Boolean = true
+    val partitioning = tableProperties.get(CarbonCommonConstants.PARTITIONING)
+    val partitioncount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT)
+
+    //partition column and partitioning should be both exist or not exist
+    if (partitionCols.isEmpty ^ partitioning.isEmpty) {
+      isValid = false
+    } else if (partitionCols.nonEmpty) {
+      partitioning.get.toUpperCase() match {
+        case "HASH" => if (!partitioncount.isDefined) isValid = false
+        case "LIST" => isValid = false
+        case "RANGE" => isValid = false
+        case "RANGE_INTERVAL" => isValid = false
+        case _ => isValid = false
+      }
+    }
+    //only support one partition column for now
+    if (partitionCols.length > 1) isValid = false
+    isValid
+  }
+
   def validateFields(key: String, fields: Seq[Field]): Boolean = {
     var isValid: Boolean = false
     fields.foreach { field =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/28e7981d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index b22422b..9aacc7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -114,11 +114,19 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
                             duplicateColumns.mkString("[", ",", "]"), ctx)
       }
 
-      // partition columns must be part of the schema
-      val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
-      if (badPartCols.isEmpty) {
-        operationNotAllowed(s"Partition columns must be specified in the schema: " +
-                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+      val tableProperties = mutable.Map[String, String]()
+      properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
+
+      // validate partition clause
+      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionCols)) {
+        operationNotAllowed("Invalid Partition definition", ctx)
+      } else if (partitionCols.nonEmpty) {
+        // partition columns must be part of the schema
+        val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
+        if (badPartCols.isEmpty) {
+          operationNotAllowed(s"Partition columns must be specified in the schema: " +
+                              badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+        }
       }
 
       val fields = cols.map { col =>
@@ -171,8 +179,6 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         None
       }
 
-      val tableProperties = mutable.Map[String, String]()
-      properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
       // prepare table model of the collected tokens
       val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
         convertDbNameToLowerCase(name.database),