You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/15 05:06:24 UTC
[11/19] carbondata git commit: [CARBONDATA-936] Parse partition table
ddl This closes #882
[CARBONDATA-936] Parse partition table ddl This closes #882
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dae342bd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dae342bd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dae342bd
Branch: refs/heads/master
Commit: dae342bd57fe6d4704ad754b58438c1ec9d78516
Parents: f4d081e
Author: lionelcao <wh...@gmail.com>
Authored: Fri May 12 21:52:17 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun May 14 20:40:13 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 6 +-
.../core/metadata/schema/PartitionInfo.java | 29 ++--
.../src/main/resources/partition_data.csv | 25 ++++
.../examples/CarbonPartitionExample.scala | 131 +++++++++++++++++++
format/src/main/thrift/schema.thrift | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 37 +++++-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 100 +++++++++-----
.../execution/command/carbonTableSchema.scala | 5 +-
.../org/apache/spark/sql/TableCreator.scala | 5 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 37 +++---
10 files changed, 300 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b4235e2..33b0ef7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -816,8 +816,10 @@ public final class CarbonCommonConstants {
public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
public static final String DICTIONARY_INCLUDE = "dictionary_include";
public static final String SORT_COLUMNS = "sort_columns";
- public static final String PARTITIONCLASS = "partitionclass";
- public static final String PARTITIONCOUNT = "partitioncount";
+ public static final String PARTITION_TYPE = "partition_type";
+ public static final String NUM_PARTITIONS = "num_partitions";
+ public static final String RANGE_INFO = "range_info";
+ public static final String LIST_INFO = "list_info";
public static final String COLUMN_PROPERTIES = "columnproperties";
// table block size in MB
public static final String TABLE_BLOCKSIZE = "table_blocksize";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 86ef3c5..cd4ac0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -24,40 +24,29 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
/**
- * Partition Information of carbon partition table
+ * Partition information of carbon partition table
*/
public class PartitionInfo implements Serializable {
- /**
- * Partition columns
- */
private List<ColumnSchema> columnSchemaList;
- /**
- * partition type
- */
private PartitionType partitionType;
/**
- * Range Partition definition
+ * range information defined for range partition table
*/
private List<String> rangeInfo;
/**
- * List Partition definition
+ * value list defined for list partition table
*/
private List<List<String>> listInfo;
/**
- * Hash Partition numbers
+ * number of partitions
*/
- private int hashNumber;
+ private int numPartitions;
- /**
- * For range partition table
- * @param columnSchemaList
- * @param partitionType
- */
public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitionType) {
this.columnSchemaList = columnSchemaList;
this.partitionType = partitionType;
@@ -71,12 +60,12 @@ public class PartitionInfo implements Serializable {
return partitionType;
}
- public void setHashNumber(int hashNumber) {
- this.hashNumber = hashNumber;
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
}
- public int getHashNumber() {
- return hashNumber;
+ public int getNumPartitions() {
+ return numPartitions;
}
public void setRangeInfo(List<String> rangeInfo) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/examples/spark2/src/main/resources/partition_data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/partition_data.csv b/examples/spark2/src/main/resources/partition_data.csv
new file mode 100644
index 0000000..2d521b7
--- /dev/null
+++ b/examples/spark2/src/main/resources/partition_data.csv
@@ -0,0 +1,25 @@
+vin,logdate,phonenumber,country,area
+A42158424831,2016/02/12,125371341,China,Asia
+A42158473831,2016/01/12,125371342,China,Asia
+A42152474832,2016/02/12,125371343,US,America
+A42151477823,2016/12/12,125371344,China,Asia
+A42158474135,2016/02/15,125371345,Japan,Asia
+A42258434831,2016/12/12,125371346,China,Asia
+A42158475831,2016/05/12,125371347,UK,Europe
+A41158494830,2015/07/12,225371348,China,Asia
+A42158424831,2015/02/12,225371349,China,Asia
+A42158473830,2014/01/12,225371310,China,Asia
+A42152474830,2013/02/12,325371311,US,America
+A42151477823,2012/12/12,425371312,China,Asia
+A42158474133,2012/02/15,325371313,Japan,Asia
+A42258434835,2013/12/12,525371314,China,Asia
+A42158475836,2014/05/12,625371315,UK,Europe
+A41158494838,2015/07/12,525371316,China,Asia
+A42158424833,2016/02/12,425371317,China,Asia
+A42158473832,2017/01/12,325371318,China,Asia
+A42152474834,2011/02/12,225371319,US,America
+A42151477824,2012/12/12,225371320,China,Asia
+A42158474137,2013/02/15,325371321,Japan,Asia
+A42258434837,2014/12/12,25371322,China,Asia
+A42158475838,2014/05/12,425371323,UK,Europe
+A41158494839,2016/07/12,625371324,China,Asia
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
new file mode 100644
index 0000000..5f07b4b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonPartitionExample {
+
+ def main(args: Array[String]) {
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val testData = s"$rootPath/examples/spark2/src/main/resources/partition_data.csv"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+ import org.apache.spark.sql.CarbonSession._
+
+ val spark = SparkSession
+ .builder()
+ .master("local")
+ .appName("CarbonPartitionExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ // none partition table
+ spark.sql("DROP TABLE IF EXISTS t0")
+
+ spark.sql("""
+ | CREATE TABLE IF NOT EXISTS t0
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Long,
+ | country String,
+ | area String
+ | )
+ | STORED BY 'carbondata'
+ """.stripMargin)
+
+ // range partition
+ spark.sql("DROP TABLE IF EXISTS t1")
+
+ spark.sql("""
+ | CREATE TABLE IF NOT EXISTS t1
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Long,
+ | country String,
+ | area String
+ | )
+ | PARTITIONED BY (logdate Timestamp)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='20140101, 2015/01/01 ,2016-01-01, ')
+ """.stripMargin)
+
+ // hash partition
+ spark.sql("DROP TABLE IF EXISTS t3")
+
+ spark.sql("""
+ | CREATE TABLE IF NOT EXISTS t3
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Long,
+ | country String,
+ | area String
+ | )
+ | PARTITIONED BY (vin String)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5')
+ """.stripMargin)
+
+ // list partition
+ spark.sql("DROP TABLE IF EXISTS t5")
+
+ spark.sql("""
+ | CREATE TABLE IF NOT EXISTS t5
+ | (
+ | vin String,
+ | logdate Timestamp,
+ | phonenumber Long,
+ | country String,
+ | area String
+ |)
+ | PARTITIONED BY (country string)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ')
+ """.stripMargin)
+
+ // spark.sql(s"""
+ // LOAD DATA LOCAL INPATH '$testData' into table t3
+ // options('BAD_RECORDS_ACTION'='FORCE')
+ // """)
+
+ // spark.sql("select vin, count(*) from t3 group by vin
+ // order by count(*) desc").show(50)
+
+ // Drop table
+ // spark.sql("DROP TABLE IF EXISTS t3")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 695333c..3385245 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -132,7 +132,7 @@ struct SchemaEvolution{
struct PartitionInfo{
1: required list<ColumnSchema> partition_columns;
2: required PartitionType partition_type;
- 3: optional i32 hash_number; // number of partitions defined in hash partition table
+ 3: optional i32 num_partitions; // number of partitions defined in hash partition table
4: optional list<list<string>> list_info; // value list of list partition table
5: optional list<string> range_info; // range value list of range partition table
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 679a4e7..5314a15 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -26,7 +26,8 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.SparkContext
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
+import org.apache.spark.sql.types.StructField
import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -144,6 +145,40 @@ object CommonUtil {
isValid
}
+ /**
+ * 1. If partitioned by clause exists, then partition_type should be defined
+ * 2. If partition_type is Hash, then number_of_partitions should be defined
+ * 3. If partition_type is List, then list_info should be defined
+ * 4. If partition_type is Range, then range_info should be defined
+ * 5. Only support single level partition for now
+ * @param tableProperties
+ * @param partitionerFields
+ * @return partition clause and definition in tblproperties are valid or not
+ */
+ def validatePartitionColumns(tableProperties: Map[String, String],
+ partitionerFields: Seq[PartitionerField]): Boolean = {
+ var isValid: Boolean = true
+ val partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE)
+ val numPartitions = tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS)
+ val rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO)
+ val listInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO)
+
+ if (partitionType.isEmpty) {
+ isValid = false
+ } else {
+ partitionType.get.toUpperCase() match {
+ case "HASH" => if (!numPartitions.isDefined) isValid = false
+ case "LIST" => if (!listInfo.isDefined) isValid = false
+ case "RANGE" => if (!rangeInfo.isDefined) isValid = false
+ case "RANGE_INTERVAL" => isValid = false
+ case _ => isValid = false
+ }
+ // only support one partition column for now
+ if (partitionerFields.length > 1) isValid = false
+ }
+ isValid
+ }
+
def validateFields(key: String, fields: Seq[Field]): Boolean = {
var isValid: Boolean = false
fields.foreach { field =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 0df3b31..2e9e61d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map}
import scala.language.implicitConversions
import scala.util.matching.Regex
@@ -33,10 +33,13 @@ import org.apache.spark.sql.execution.command._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.constants.LoggerAction
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
/**
* TODO remove the duplicate code and add the common methods to common class.
@@ -258,6 +261,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
// get no inverted index columns from table properties.
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+ // get partitionInfo
+ val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
// validate the tableBlockSize from table properties
CommonUtil.validateTableBlockSize(tableProperties)
@@ -275,7 +280,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
Option(noInvertedIdxCols),
groupCols,
Some(colProps),
- bucketFields: Option[BucketFields])
+ bucketFields: Option[BucketFields],
+ partitionInfo)
}
/**
@@ -347,43 +353,69 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
}
/**
- * For getting the partitioner Object
- *
* @param partitionCols
* @param tableProperties
- * @return
*/
- protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]):
- Option[Partitioner] = {
-
- // by default setting partition class empty.
- // later in table schema it is setting to default value.
- var partitionClass: String = ""
- var partitionCount: Int = 1
- var partitionColNames: Array[String] = Array[String]()
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
- partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
- }
-
- if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
- try {
- partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
- } catch {
- case e: Exception => // no need to do anything.
+ protected def getPartitionInfo(partitionCols: Seq[PartitionerField],
+ tableProperties: Map[String, String]): Option[PartitionInfo] = {
+ if (partitionCols.isEmpty) {
+ None
+ } else {
+ var partitionType: String = ""
+ var numPartitions = 0
+ var rangeInfo = List[String]()
+ var listInfo = ListBuffer[List[String]]()
+ var templist = ListBuffer[String]()
+ if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) {
+ partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get
}
- }
-
- partitionCols.foreach(col =>
- partitionColNames :+= col.partitionColumn
- )
+ if (tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS).isDefined) {
+ numPartitions = tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS).get
+ .toInt
+ }
+ if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) {
+ rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.split(",")
+ .map(_.trim()).toList
+ }
+ if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) {
+ val arr = tableProperties.get(CarbonCommonConstants.LIST_INFO).get.split(",")
+ .map(_.trim())
+ val iter = arr.iterator
+ while (iter.hasNext) {
+ val value = iter.next()
+ if (value.startsWith("(")) {
+ templist += value.replace("(", "").trim()
+ } else if (value.endsWith(")")) {
+ templist += value.replace(")", "").trim()
+ listInfo += templist.toList
+ templist.clear()
+ } else {
+ templist += value
+ listInfo += templist.toList
+ templist.clear()
+ }
+ }
+ }
+ val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
+ partitionCols.foreach(partition_col => {
+ val columnSchema = new ColumnSchema
+ columnSchema.setDataType(DataTypeConverterUtil.
+ convertToCarbonType(partition_col.dataType.get))
+ columnSchema.setColumnName(partition_col.partitionColumn)
+ cols += columnSchema
+ })
- // this means user has given partition cols list
- if (!partitionColNames.isEmpty) {
- return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+ var partitionInfo : PartitionInfo = null
+ partitionType.toUpperCase() match {
+ case "HASH" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH)
+ partitionInfo.setNumPartitions(numPartitions)
+ case "RANGE" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE)
+ partitionInfo.setRangeInfo(rangeInfo.asJava)
+ case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
+ partitionInfo.setListInfo(listInfo.map(_.asJava).toList.asJava)
+ }
+ Some(partitionInfo)
}
- // if partition cols are not given then no need to do partition.
- None
}
protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 187512d..6f50ecc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.service.CarbonCommonFactory
@@ -60,7 +60,8 @@ case class TableModel(
columnGroups: Seq[String],
colProps: Option[util.Map[String,
util.List[ColumnProperty]]] = None,
- bucketFields: Option[BucketFields])
+ bucketFields: Option[BucketFields],
+ partitionInfo: Option[PartitionInfo])
case class Field(column: String, var dataType: Option[String], name: Option[String],
children: Option[List[Field]], parent: String = null,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 068f852..3aa2d4e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -484,6 +484,8 @@ object TableCreator {
// get no inverted index columns from table properties.
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+ val partitionInfo = None
+
// validate the tableBlockSize from table properties
CommonUtil.validateTableBlockSize(tableProperties)
@@ -500,7 +502,8 @@ object TableCreator {
Option(noInvertedIdxCols),
groupCols,
Some(colProps),
- bucketFields: Option[BucketFields])
+ bucketFields: Option[BucketFields],
+ partitionInfo)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index dc7e608..eb134d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
+PartitionerField, TableModel}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.carbondata.spark.CarbonOption
@@ -96,7 +97,9 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
- val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+ val partitionerFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+ .map( structField =>
+ PartitionerField(structField.name, Some(structField.dataType.toString), null))
val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
.getOrElse(Map.empty)
@@ -111,21 +114,23 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
duplicateColumns.mkString("[", ",", "]"), ctx)
}
- // For Hive tables, partition columns must not be part of the schema
- val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
- if (badPartCols.nonEmpty) {
- operationNotAllowed(s"Partition columns may not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
- }
-
- // Note: Hive requires partition columns to be distinct from the schema, so we need
- // to include the partition columns here explicitly
- val schema = cols ++ partitionCols
+ val tableProperties = mutable.Map[String, String]()
+ properties.foreach{property => tableProperties.put(property._1, property._2.toLowerCase)}
- val fields = parser.getFields(schema)
+ // validate partition clause
+ if (partitionerFields.nonEmpty) {
+ if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+ throw new MalformedCarbonCommandException("Invalid partition definition")
+ }
+ // partition columns must be part of the schema
+ val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+ if (badPartCols.isEmpty) {
+ operationNotAllowed(s"Partition columns must be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+ }
+ }
- val tableProperties = mutable.Map[String, String]()
- properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
+ val fields = parser.getFields(cols)
val options = new CarbonOption(properties)
// validate tblProperties
@@ -136,7 +141,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
convertDbNameToLowerCase(name.database),
name.table.toLowerCase,
fields,
- Seq(),
+ partitionerFields,
tableProperties,
bucketFields)