You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/08 04:17:30 UTC
carbondata git commit: [CARBONDATA-1580][Streaming] Create table with
streaming property
Repository: carbondata
Updated Branches:
refs/heads/master 74bd52b66 -> 4c41f8662
[CARBONDATA-1580][Streaming] Create table with streaming property
This closes #1449
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c41f866
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c41f866
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c41f866
Branch: refs/heads/master
Commit: 4c41f8662c60c10f35f0eed4758975ea8b926a6e
Parents: 74bd52b
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 8 10:51:04 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 8 12:16:44 2017 +0800
----------------------------------------------------------------------
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 10 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 3 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 114 ++++++++++++-------
.../TestStreamingTableOperation.scala | 30 ++++-
5 files changed, 116 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 0bc9285..fe07aac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -59,5 +59,7 @@ class CarbonOption(options: Map[String, String]) {
def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
options.contains("bucketnumber")
+ def isStreaming: Boolean = options.getOrElse("streaming", "false").toBoolean
+
def toMap: Map[String, String] = options
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index aae4f25..ee51954 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -242,11 +242,15 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
* @param tableProperties
* @return
*/
- def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
- , tableName: String, fields: Seq[Field],
+ def prepareTableModel(
+ ifNotExistPresent: Boolean,
+ dbName: Option[String],
+ tableName: String,
+ fields: Seq[Field],
partitionCols: Seq[PartitionerField],
tableProperties: mutable.Map[String, String],
- bucketFields: Option[BucketFields], isAlterFlow: Boolean = false,
+ bucketFields: Option[BucketFields],
+ isAlterFlow: Boolean = false,
tableComment: Option[String] = None): TableModel = {
fields.zipWithIndex.foreach { case (field, index) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9f4a8ce..9c87b8b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -456,7 +456,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
}
- def getBucketFields(properties: mutable.Map[String, String],
+ def getBucketFields(
+ properties: mutable.Map[String, String],
fields: Seq[Field],
options: CarbonOption): Option[BucketFields] = {
if (!CommonUtil.validateTblProperties(properties,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 81ce73f..0a918df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -19,15 +19,17 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, PartitionerField, TableModel}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.StructField
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -99,54 +101,42 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
}
- val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
- val partitionerFields = partitionByStructFields.map { structField =>
- PartitionerField(structField.name, Some(structField.dataType.toString), null)
- }
- val tableComment = Option(ctx.STRING()).map(string)
- val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
- val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
- .getOrElse(Map.empty)
- // Ensuring whether no duplicate name is used in table definition
- val colNames = cols.map(_.name)
- if (colNames.length != colNames.distinct.length) {
- val duplicateColumns = colNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }
- operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
- duplicateColumns.mkString("[", ",", "]"), ctx)
- }
+ // validate schema
+ val (colsStructFields, colNames) = validateSchema(ctx, name)
val tableProperties = mutable.Map[String, String]()
+ val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+ .getOrElse(Map.empty)
properties.foreach{property => tableProperties.put(property._1, property._2)}
- // validate partition clause
- if (partitionerFields.nonEmpty) {
- if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
- throw new MalformedCarbonCommandException("Error: Invalid partition definition")
- }
- // partition columns should not be part of the schema
- val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
- if (badPartCols.nonEmpty) {
- operationNotAllowed(s"Partition columns should not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
- }
- }
- val fields = parser.getFields(cols ++ partitionByStructFields)
val options = new CarbonOption(properties)
- // validate tblProperties
- val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
+ // validate streaming table property
+ validateStreamingProperty(ctx, options)
+
+ // validate partition clause
+ val (partitionByStructFields, partitionFields) =
+ validateParitionFields(ctx, colNames, tableProperties)
+
+ val fields = parser.getFields(colsStructFields ++ partitionByStructFields)
+
+ // validate bucket fields
+ val bucketFields: Option[BucketFields] =
+ parser.getBucketFields(tableProperties, fields, options)
+
+ val tableComment = Option(ctx.STRING()).map(string)
// prepare table model of the collected tokens
- val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+ val tableModel: TableModel = parser.prepareTableModel(
+ ifNotExists,
convertDbNameToLowerCase(name.database),
name.table.toLowerCase,
fields,
- partitionerFields,
+ partitionFields,
tableProperties,
bucketFields,
- false,
+ isAlterFlow = false,
tableComment)
CarbonCreateTableCommand(tableModel)
@@ -189,8 +179,56 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
private def needToConvertToLowerCase(key: String): Boolean = {
val noConvertList = Array("LIST_INFO", "RANGE_INFO")
- !noConvertList.exists(x => x.equalsIgnoreCase(key));
+ !noConvertList.exists(x => x.equalsIgnoreCase(key))
+ }
+
+ private def validateParitionFields(
+ ctx: CreateTableContext,
+ colNames: Seq[String],
+ tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
+ val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
+ val partitionerFields = partitionByStructFields.map { structField =>
+ PartitionerField(structField.name, Some(structField.dataType.toString), null)
+ }
+ if (partitionerFields.nonEmpty) {
+ if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+ throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+ }
+ // partition columns should not be part of the schema
+ val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+ if (badPartCols.nonEmpty) {
+ operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+ }
+ }
+ (partitionByStructFields, partitionerFields)
}
+ private def validateSchema(
+ ctx: CreateTableContext,
+ name: TableIdentifier): (Seq[StructField], Seq[String]) = {
+ // Validate schema, ensuring whether no duplicate name is used in table definition
+ val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+ val colNames = cols.map(_.name)
+ if (colNames.length != colNames.distinct.length) {
+ val duplicateColumns = colNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }
+ operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
+ duplicateColumns.mkString("[", ",", "]"), ctx)
+ }
+ (cols, colNames)
+ }
+ private def validateStreamingProperty(
+ ctx: CreateTableContext,
+ carbonOption: CarbonOption): Unit = {
+ try {
+ carbonOption.isStreaming
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new MalformedCarbonCommandException(
+ "Table property 'streaming' should be either 'true' or 'false'")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c41f866/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 2c1c6b8..b733d4f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -17,6 +17,7 @@
package org.apache.spark.carbondata
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -31,7 +32,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("USE streaming")
sql(
"""
- | create table source(
+ | CREATE TABLE source(
| c1 string,
| c2 int,
| c3 string,
@@ -42,6 +43,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""")
}
+ test("validate streaming property") {
+ sql(
+ """
+ | CREATE TABLE correct(
+ | c1 string
+ | ) STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES ('streaming' = 'true')
+ """.stripMargin)
+ sql("DROP TABLE correct")
+ sql(
+ """
+ | CREATE TABLE correct(
+ | c1 string
+ | ) STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES ('streaming' = 'false')
+ """.stripMargin)
+ sql("DROP TABLE correct")
+ intercept[MalformedCarbonCommandException] {
+ sql(
+ """
+ | create table wrong(
+ | c1 string
+ | ) STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES ('streaming' = 'invalid')
+ """.stripMargin)
+ }
+ }
test("test blocking update and delete operation on streaming table") {
intercept[MalformedCarbonCommandException] {