You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/15 05:06:24 UTC

[11/19] carbondata git commit: [CARBONDATA-936] Parse partition table ddl This closes #882

[CARBONDATA-936] Parse partition table ddl  This closes #882


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dae342bd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dae342bd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dae342bd

Branch: refs/heads/master
Commit: dae342bd57fe6d4704ad754b58438c1ec9d78516
Parents: f4d081e
Author: lionelcao <wh...@gmail.com>
Authored: Fri May 12 21:52:17 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun May 14 20:40:13 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +-
 .../core/metadata/schema/PartitionInfo.java     |  29 ++--
 .../src/main/resources/partition_data.csv       |  25 ++++
 .../examples/CarbonPartitionExample.scala       | 131 +++++++++++++++++++
 format/src/main/thrift/schema.thrift            |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  37 +++++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 100 +++++++++-----
 .../execution/command/carbonTableSchema.scala   |   5 +-
 .../org/apache/spark/sql/TableCreator.scala     |   5 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  37 +++---
 10 files changed, 300 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index b4235e2..33b0ef7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -816,8 +816,10 @@ public final class CarbonCommonConstants {
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
   public static final String SORT_COLUMNS = "sort_columns";
-  public static final String PARTITIONCLASS = "partitionclass";
-  public static final String PARTITIONCOUNT = "partitioncount";
+  public static final String PARTITION_TYPE = "partition_type";
+  public static final String NUM_PARTITIONS = "num_partitions";
+  public static final String RANGE_INFO = "range_info";
+  public static final String LIST_INFO = "list_info";
   public static final String COLUMN_PROPERTIES = "columnproperties";
   // table block size in MB
   public static final String TABLE_BLOCKSIZE = "table_blocksize";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 86ef3c5..cd4ac0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -24,40 +24,29 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
- * Partition Information of carbon partition table
+ * Partition information of carbon partition table
  */
 public class PartitionInfo implements Serializable {
 
-  /**
-   * Partition columns
-   */
   private List<ColumnSchema> columnSchemaList;
 
-  /**
-   * partition type
-   */
   private PartitionType partitionType;
 
   /**
-   * Range Partition definition
+   * range information defined for range partition table
    */
   private List<String> rangeInfo;
 
   /**
-   * List Partition definition
+   * value list defined for list partition table
    */
   private List<List<String>> listInfo;
 
   /**
-   * Hash Partition numbers
+   * number of partitions
    */
-  private int hashNumber;
+  private int numPartitions;
 
-  /**
-   * For range partition table
-   * @param columnSchemaList
-   * @param partitionType
-   */
   public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitionType) {
     this.columnSchemaList = columnSchemaList;
     this.partitionType = partitionType;
@@ -71,12 +60,12 @@ public class PartitionInfo implements Serializable {
     return partitionType;
   }
 
-  public void setHashNumber(int hashNumber) {
-    this.hashNumber = hashNumber;
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
   }
 
-  public int getHashNumber() {
-    return hashNumber;
+  public int getNumPartitions() {
+    return numPartitions;
   }
 
   public void setRangeInfo(List<String> rangeInfo) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/examples/spark2/src/main/resources/partition_data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/partition_data.csv b/examples/spark2/src/main/resources/partition_data.csv
new file mode 100644
index 0000000..2d521b7
--- /dev/null
+++ b/examples/spark2/src/main/resources/partition_data.csv
@@ -0,0 +1,25 @@
+vin,logdate,phonenumber,country,area
+A42158424831,2016/02/12,125371341,China,Asia
+A42158473831,2016/01/12,125371342,China,Asia
+A42152474832,2016/02/12,125371343,US,America
+A42151477823,2016/12/12,125371344,China,Asia
+A42158474135,2016/02/15,125371345,Japan,Asia
+A42258434831,2016/12/12,125371346,China,Asia
+A42158475831,2016/05/12,125371347,UK,Europe
+A41158494830,2015/07/12,225371348,China,Asia
+A42158424831,2015/02/12,225371349,China,Asia
+A42158473830,2014/01/12,225371310,China,Asia
+A42152474830,2013/02/12,325371311,US,America
+A42151477823,2012/12/12,425371312,China,Asia
+A42158474133,2012/02/15,325371313,Japan,Asia
+A42258434835,2013/12/12,525371314,China,Asia
+A42158475836,2014/05/12,625371315,UK,Europe
+A41158494838,2015/07/12,525371316,China,Asia
+A42158424833,2016/02/12,425371317,China,Asia
+A42158473832,2017/01/12,325371318,China,Asia
+A42152474834,2011/02/12,225371319,US,America
+A42151477824,2012/12/12,225371320,China,Asia
+A42158474137,2013/02/15,325371321,Japan,Asia
+A42258434837,2014/12/12,25371322,China,Asia
+A42158475838,2014/05/12,425371323,UK,Europe
+A41158494839,2016/07/12,625371324,China,Asia

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
new file mode 100644
index 0000000..5f07b4b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonPartitionExample {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target"
+    val testData = s"$rootPath/examples/spark2/src/main/resources/partition_data.csv"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonPartitionExample")
+      .config("spark.sql.warehouse.dir", warehouse)
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    // none partition table
+    spark.sql("DROP TABLE IF EXISTS t0")
+
+    spark.sql("""
+                | CREATE TABLE IF NOT EXISTS t0
+                | (
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | area String
+                | )
+                | STORED BY 'carbondata'
+              """.stripMargin)
+
+    // range partition
+    spark.sql("DROP TABLE IF EXISTS t1")
+
+    spark.sql("""
+                | CREATE TABLE IF NOT EXISTS t1
+                | (
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | area String
+                | )
+                | PARTITIONED BY (logdate Timestamp)
+                | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+                | 'RANGE_INFO'='20140101, 2015/01/01 ,2016-01-01, ')
+              """.stripMargin)
+
+    // hash partition
+    spark.sql("DROP TABLE IF EXISTS t3")
+
+    spark.sql("""
+                | CREATE TABLE IF NOT EXISTS t3
+                | (
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | area String
+                | )
+                | PARTITIONED BY (vin String)
+                | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='5')
+                """.stripMargin)
+
+    // list partition
+    spark.sql("DROP TABLE IF EXISTS t5")
+
+    spark.sql("""
+       | CREATE TABLE IF NOT EXISTS t5
+       | (
+       | vin String,
+       | logdate Timestamp,
+       | phonenumber Long,
+       | country String,
+       | area String
+       |)
+       | PARTITIONED BY (country string)
+       | STORED BY 'carbondata'
+       | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+       | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ')
+       """.stripMargin)
+
+    // spark.sql(s"""
+    //   LOAD DATA LOCAL INPATH '$testData' into table t3
+    // options('BAD_RECORDS_ACTION'='FORCE')
+    //   """)
+
+    // spark.sql("select vin, count(*) from t3 group by vin
+    // order by count(*) desc").show(50)
+
+    // Drop table
+    // spark.sql("DROP TABLE IF EXISTS t3")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 695333c..3385245 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -132,7 +132,7 @@ struct SchemaEvolution{
 struct PartitionInfo{
     1: required list<ColumnSchema> partition_columns;
     2: required PartitionType partition_type;
-    3: optional i32 hash_number;  // number of partitions defined in hash partition table
+    3: optional i32 num_partitions;  // number of partitions defined in hash partition table
     4: optional list<list<string>> list_info; // value list of list partition table
     5: optional list<string> range_info;  // range value list of range partition table
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 679a4e7..5314a15 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -26,7 +26,8 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
+import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -144,6 +145,40 @@ object CommonUtil {
     isValid
   }
 
+  /**
+   * 1. If partitioned by clause exists, then partition_type should be defined
+   * 2. If partition_type is Hash, then number_of_partitions should be defined
+   * 3. If partition_type is List, then list_info should be defined
+   * 4. If partition_type is Range, then range_info should be defined
+   * 5. Only support single level partition for now
+   * @param tableProperties
+   * @param partitionerFields
+   * @return partition clause and definition in tblproperties are valid or not
+   */
+  def validatePartitionColumns(tableProperties: Map[String, String],
+      partitionerFields: Seq[PartitionerField]): Boolean = {
+    var isValid: Boolean = true
+    val partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE)
+    val numPartitions = tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS)
+    val rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO)
+    val listInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO)
+
+    if (partitionType.isEmpty) {
+      isValid = false
+    } else {
+      partitionType.get.toUpperCase() match {
+        case "HASH" => if (!numPartitions.isDefined) isValid = false
+        case "LIST" => if (!listInfo.isDefined) isValid = false
+        case "RANGE" => if (!rangeInfo.isDefined) isValid = false
+        case "RANGE_INTERVAL" => isValid = false
+        case _ => isValid = false
+      }
+      // only support one partition column for now
+      if (partitionerFields.length > 1) isValid = false
+    }
+    isValid
+  }
+
   def validateFields(key: String, fields: Seq[Field]): Boolean = {
     var isValid: Boolean = false
     fields.foreach { field =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 0df3b31..2e9e61d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map}
 import scala.language.implicitConversions
 import scala.util.matching.Regex
 
@@ -33,10 +33,13 @@ import org.apache.spark.sql.execution.command._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 /**
  * TODO remove the duplicate code and add the common methods to common class.
@@ -258,6 +261,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+    // get partitionInfo
+    val partitionInfo = getPartitionInfo(partitionCols, tableProperties)
 
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
@@ -275,7 +280,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields: Option[BucketFields])
+      bucketFields: Option[BucketFields],
+      partitionInfo)
   }
 
   /**
@@ -347,43 +353,69 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   }
 
   /**
-   * For getting the partitioner Object
-   *
    * @param partitionCols
    * @param tableProperties
-   * @return
    */
-  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
-      tableProperties: Map[String, String]):
-  Option[Partitioner] = {
-
-    // by default setting partition class empty.
-    // later in table schema it is setting to default value.
-    var partitionClass: String = ""
-    var partitionCount: Int = 1
-    var partitionColNames: Array[String] = Array[String]()
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
-      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
-    }
-
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
-      try {
-        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
-      } catch {
-        case e: Exception => // no need to do anything.
+  protected def getPartitionInfo(partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]): Option[PartitionInfo] = {
+    if (partitionCols.isEmpty) {
+      None
+    } else {
+      var partitionType: String = ""
+      var numPartitions = 0
+      var rangeInfo = List[String]()
+      var listInfo = ListBuffer[List[String]]()
+      var templist = ListBuffer[String]()
+      if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) {
+        partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get
       }
-    }
-
-    partitionCols.foreach(col =>
-      partitionColNames :+= col.partitionColumn
-    )
+      if (tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS).isDefined) {
+        numPartitions = tableProperties.get(CarbonCommonConstants.NUM_PARTITIONS).get
+          .toInt
+      }
+      if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) {
+        rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.split(",")
+          .map(_.trim()).toList
+      }
+      if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) {
+        val arr = tableProperties.get(CarbonCommonConstants.LIST_INFO).get.split(",")
+          .map(_.trim())
+        val iter = arr.iterator
+        while (iter.hasNext) {
+          val value = iter.next()
+          if (value.startsWith("(")) {
+            templist += value.replace("(", "").trim()
+          } else if (value.endsWith(")")) {
+            templist += value.replace(")", "").trim()
+            listInfo += templist.toList
+            templist.clear()
+          } else {
+            templist += value
+            listInfo += templist.toList
+            templist.clear()
+          }
+        }
+      }
+      val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
+      partitionCols.foreach(partition_col => {
+        val columnSchema = new ColumnSchema
+        columnSchema.setDataType(DataTypeConverterUtil.
+          convertToCarbonType(partition_col.dataType.get))
+        columnSchema.setColumnName(partition_col.partitionColumn)
+        cols += columnSchema
+      })
 
-    // this means user has given partition cols list
-    if (!partitionColNames.isEmpty) {
-      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+      var partitionInfo : PartitionInfo = null
+      partitionType.toUpperCase() match {
+        case "HASH" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH)
+          partitionInfo.setNumPartitions(numPartitions)
+        case "RANGE" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE)
+          partitionInfo.setRangeInfo(rangeInfo.asJava)
+        case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
+          partitionInfo.setListInfo(listInfo.map(_.asJava).toList.asJava)
+      }
+      Some(partitionInfo)
     }
-    // if partition cols are not given then no need to do partition.
-    None
   }
 
   protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 187512d..6f50ecc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
+import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.CarbonCommonFactory
@@ -60,7 +60,8 @@ case class TableModel(
     columnGroups: Seq[String],
     colProps: Option[util.Map[String,
     util.List[ColumnProperty]]] = None,
-    bucketFields: Option[BucketFields])
+    bucketFields: Option[BucketFields],
+    partitionInfo: Option[PartitionInfo])
 
 case class Field(column: String, var dataType: Option[String], name: Option[String],
     children: Option[List[Field]], parent: String = null,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 068f852..3aa2d4e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -484,6 +484,8 @@ object TableCreator {
     // get no inverted index columns from table properties.
     val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
 
+    val partitionInfo = None
+
     // validate the tableBlockSize from table properties
     CommonUtil.validateTableBlockSize(tableProperties)
 
@@ -500,7 +502,8 @@ object TableCreator {
       Option(noInvertedIdxCols),
       groupCols,
       Some(colProps),
-      bucketFields: Option[BucketFields])
+      bucketFields: Option[BucketFields],
+      partitionInfo)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dae342bd/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index dc7e608..eb134d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
 TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
+PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
 import org.apache.carbondata.spark.CarbonOption
@@ -96,7 +97,9 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       if (ctx.bucketSpec != null) {
         operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
       }
-      val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+      val partitionerFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+        .map( structField =>
+            PartitionerField(structField.name, Some(structField.dataType.toString), null))
       val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
       val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
         .getOrElse(Map.empty)
@@ -111,21 +114,23 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
                             duplicateColumns.mkString("[", ",", "]"), ctx)
       }
 
-      // For Hive tables, partition columns must not be part of the schema
-      val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
-      if (badPartCols.nonEmpty) {
-        operationNotAllowed(s"Partition columns may not be specified in the schema: " +
-                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
-      }
-
-      // Note: Hive requires partition columns to be distinct from the schema, so we need
-      // to include the partition columns here explicitly
-      val schema = cols ++ partitionCols
+      val tableProperties = mutable.Map[String, String]()
+      properties.foreach{property => tableProperties.put(property._1, property._2.toLowerCase)}
 
-      val fields = parser.getFields(schema)
+      // validate partition clause
+      if (partitionerFields.nonEmpty) {
+        if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+          throw new MalformedCarbonCommandException("Invalid partition definition")
+        }
+        // partition columns must be part of the schema
+        val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+        if (badPartCols.isEmpty) {
+          operationNotAllowed(s"Partition columns must be specified in the schema: " +
+                              badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+        }
+      }
 
-      val tableProperties = mutable.Map[String, String]()
-      properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
+      val fields = parser.getFields(cols)
 
       val options = new CarbonOption(properties)
       // validate tblProperties
@@ -136,7 +141,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         convertDbNameToLowerCase(name.database),
         name.table.toLowerCase,
         fields,
-        Seq(),
+        partitionerFields,
         tableProperties,
         bucketFields)