You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/08 04:17:30 UTC

carbondata git commit: [CARBONDATA-1580][Streaming] Create table with streaming property

Repository: carbondata
Updated Branches:
  refs/heads/master 74bd52b66 -> 4c41f8662


[CARBONDATA-1580][Streaming] Create table with streaming property

This closes #1449


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c41f866
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c41f866
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c41f866

Branch: refs/heads/master
Commit: 4c41f8662c60c10f35f0eed4758975ea8b926a6e
Parents: 74bd52b
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 8 10:51:04 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 8 12:16:44 2017 +0800

----------------------------------------------------------------------
 .../apache/carbondata/spark/CarbonOption.scala  |   2 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  10 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   3 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 114 ++++++++++++-------
 .../TestStreamingTableOperation.scala           |  30 ++++-
 5 files changed, 116 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 0bc9285..fe07aac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -59,5 +59,7 @@ class CarbonOption(options: Map[String, String]) {
   def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
                                     options.contains("bucketnumber")
 
+  def isStreaming: Boolean = options.getOrElse("streaming", "false").toBoolean
+
   def toMap: Map[String, String] = options
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index aae4f25..ee51954 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -242,11 +242,15 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @param tableProperties
    * @return
    */
-  def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
-      , tableName: String, fields: Seq[Field],
+  def prepareTableModel(
+      ifNotExistPresent: Boolean,
+      dbName: Option[String],
+      tableName: String,
+      fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
       tableProperties: mutable.Map[String, String],
-      bucketFields: Option[BucketFields], isAlterFlow: Boolean = false,
+      bucketFields: Option[BucketFields],
+      isAlterFlow: Boolean = false,
       tableComment: Option[String] = None): TableModel = {
 
     fields.zipWithIndex.foreach { case (field, index) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9f4a8ce..9c87b8b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -456,7 +456,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
   }
 
-  def getBucketFields(properties: mutable.Map[String, String],
+  def getBucketFields(
+      properties: mutable.Map[String, String],
       fields: Seq[Field],
       options: CarbonOption): Option[BucketFields] = {
     if (!CommonUtil.validateTblProperties(properties,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 81ce73f..0a918df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -19,15 +19,17 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.StructField
 
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -99,54 +101,42 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       if (ctx.bucketSpec != null) {
         operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
       }
-      val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
-      val partitionerFields = partitionByStructFields.map { structField =>
-        PartitionerField(structField.name, Some(structField.dataType.toString), null)
-      }
-      val tableComment = Option(ctx.STRING()).map(string)
-      val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
-      val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
-        .getOrElse(Map.empty)
 
-      // Ensuring whether no duplicate name is used in table definition
-      val colNames = cols.map(_.name)
-      if (colNames.length != colNames.distinct.length) {
-        val duplicateColumns = colNames.groupBy(identity).collect {
-          case (x, ys) if ys.length > 1 => "\"" + x + "\""
-        }
-        operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
-                            duplicateColumns.mkString("[", ",", "]"), ctx)
-      }
+      // validate schema
+      val (colsStructFields, colNames) = validateSchema(ctx, name)
 
       val tableProperties = mutable.Map[String, String]()
+      val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+        .getOrElse(Map.empty)
       properties.foreach{property => tableProperties.put(property._1, property._2)}
 
-      // validate partition clause
-      if (partitionerFields.nonEmpty) {
-        if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
-          throw new MalformedCarbonCommandException("Error: Invalid partition definition")
-        }
-        // partition columns should not be part of the schema
-        val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
-        if (badPartCols.nonEmpty) {
-          operationNotAllowed(s"Partition columns should not be specified in the schema: " +
-                              badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
-        }
-      }
-      val fields = parser.getFields(cols ++ partitionByStructFields)
       val options = new CarbonOption(properties)
-      // validate tblProperties
-      val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
+      // validate streaming table property
+      validateStreamingProperty(ctx, options)
+
+      // validate partition clause
+      val (partitionByStructFields, partitionFields) =
+        validateParitionFields(ctx, colNames, tableProperties)
+
+      val fields = parser.getFields(colsStructFields ++ partitionByStructFields)
+
+      // validate bucket fields
+      val bucketFields: Option[BucketFields] =
+        parser.getBucketFields(tableProperties, fields, options)
+
+      val tableComment = Option(ctx.STRING()).map(string)
 
       // prepare table model of the collected tokens
-      val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+      val tableModel: TableModel = parser.prepareTableModel(
+        ifNotExists,
         convertDbNameToLowerCase(name.database),
         name.table.toLowerCase,
         fields,
-        partitionerFields,
+        partitionFields,
         tableProperties,
         bucketFields,
-        false,
+        isAlterFlow = false,
         tableComment)
 
       CarbonCreateTableCommand(tableModel)
@@ -189,8 +179,56 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
 
   private def needToConvertToLowerCase(key: String): Boolean = {
     val noConvertList = Array("LIST_INFO", "RANGE_INFO")
-    !noConvertList.exists(x => x.equalsIgnoreCase(key));
+    !noConvertList.exists(x => x.equalsIgnoreCase(key))
+  }
+
+  private def validateParitionFields(
+      ctx: CreateTableContext,
+      colNames: Seq[String],
+      tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
+    val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+    val partitionerFields = partitionByStructFields.map { structField =>
+      PartitionerField(structField.name, Some(structField.dataType.toString), null)
+    }
+    if (partitionerFields.nonEmpty) {
+      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+        throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+      }
+      // partition columns should not be part of the schema
+      val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+      if (badPartCols.nonEmpty) {
+        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+                            badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+      }
+    }
+    (partitionByStructFields, partitionerFields)
   }
 
+  private def validateSchema(
+      ctx: CreateTableContext,
+      name: TableIdentifier): (Seq[StructField], Seq[String]) = {
+    // Validate schema, ensuring whether no duplicate name is used in table definition
+    val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+    val colNames = cols.map(_.name)
+    if (colNames.length != colNames.distinct.length) {
+      val duplicateColumns = colNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }
+      operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
+                          duplicateColumns.mkString("[", ",", "]"), ctx)
+    }
+    (cols, colNames)
+  }
 
+  private def validateStreamingProperty(
+      ctx: CreateTableContext,
+      carbonOption: CarbonOption): Unit = {
+    try {
+      carbonOption.isStreaming
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new MalformedCarbonCommandException(
+          "Table property 'streaming' should be either 'true' or 'false'")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 2c1c6b8..b733d4f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.carbondata
 
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -31,7 +32,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("USE streaming")
     sql(
       """
-        | create table source(
+        | CREATE TABLE source(
         |    c1 string,
         |    c2 int,
         |    c3 string,
@@ -42,6 +43,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
   }
 
+  test("validate streaming property") {
+    sql(
+      """
+        | CREATE TABLE correct(
+        |    c1 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'true')
+      """.stripMargin)
+    sql("DROP TABLE correct")
+    sql(
+      """
+        | CREATE TABLE correct(
+        |    c1 string
+        | ) STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES ('streaming' = 'false')
+      """.stripMargin)
+    sql("DROP TABLE correct")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        """
+          | create table wrong(
+          |    c1 string
+          | ) STORED BY 'org.apache.carbondata.format'
+          | TBLPROPERTIES ('streaming' = 'invalid')
+        """.stripMargin)
+    }
+  }
 
   test("test blocking update and delete operation on streaming table") {
     intercept[MalformedCarbonCommandException] {