You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/11 13:53:29 UTC
[39/50] carbondata git commit: fixed complex data type in CarbonSource
fixed complex data type in CarbonSource
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10b9e273
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10b9e273
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10b9e273
Branch: refs/heads/branch-1.1
Commit: 10b9e273112446f9aa4ce760130a8da091a3d719
Parents: 409bd15
Author: kunal642 <ku...@knoldus.in>
Authored: Tue May 9 17:43:23 2017 +0530
Committer: kunal642 <ku...@knoldus.in>
Committed: Tue May 9 20:24:23 2017 +0530
----------------------------------------------------------------------
.../org/apache/spark/sql/CarbonSource.scala | 61 +++++-------------
.../sql/parser/CarbonSpark2SqlParser.scala | 58 +++++++++++++++++
.../spark/sql/parser/CarbonSparkSqlParser.scala | 56 ++--------------
.../carbondata/CarbonDataSourceSuite.scala | 67 +++++++++++++++++++-
4 files changed, 148 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 19a278b..1c16143 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DecimalType, StructType}
@@ -114,8 +115,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
addLateDecodeOptimization(sqlContext.sparkSession)
val dbName: String = parameters.getOrElse("dbName",
CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
- val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
-
+ val tableOption: Option[String] = parameters.get("tableName")
+ if (tableOption.isEmpty) {
+ sys.error("Table creation failed. Table name is not specified")
+ }
+ val tableName = tableOption.get.toLowerCase()
+ if (tableName.contains(" ")) {
+ sys.error("Table creation failed. Table name cannot contain blank space")
+ }
val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
getPathForTable(sqlContext.sparkSession, dbName, tableName)
@@ -140,55 +147,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
val dbName: String = parameters.getOrElse("dbName",
CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
- val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
- if (StringUtils.isBlank(tableName)) {
- throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
- }
- if (tableName.contains(" ")) {
- throw new MalformedCarbonCommandException("Table Name Should not have spaces ")
- }
- val options = new CarbonOption(parameters)
+ val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
try {
CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession)
CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
} catch {
case ex: NoSuchTableException =>
- val fields = dataSchema.map { col =>
- val dataType = Option(col.dataType.toString)
- // This is to parse complex data types
- val colName = col.name.toLowerCase
- val f: Field = Field(colName, dataType, Option(colName), None, null)
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- Option(col.dataType).foreach {
- case d: DecimalType =>
- f.precision = d.precision
- f.scale = d.scale
- f.dataType = Some("decimal")
- case _ => // do nothing
- }
- f
- }
+ val sqlParser = new CarbonSpark2SqlParser
+ val fields = sqlParser.getFields(dataSchema)
val map = scala.collection.mutable.Map[String, String]()
- parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
- val bucketFields = if (options.isBucketingEnabled) {
- if (options.bucketNumber.toString.contains("-") ||
- options.bucketNumber.toString.contains("+")) {
- throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
- options.bucketNumber.toString)
- }
- else {
- Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
- options.bucketNumber))
- }
- } else {
- None
- }
-
- val cm = TableCreator.prepareTableModel(ifNotExistPresent = false, Option(dbName),
- tableName, fields, Nil, bucketFields, map)
+ parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
+ val options = new CarbonOption(parameters)
+ val bucketFields = sqlParser.getBucketFields(map, fields, options)
+ val cm = sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
+ tableName, fields, Nil, map, bucketFields)
CreateTable(cm, false).run(sparkSession)
CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
case ex: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 695f5fc..d1a764f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -17,14 +17,17 @@
package org.apache.spark.sql.parser
+import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.spark.sql.ShowLoadsCommand
import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -238,4 +241,59 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
values.map(_.toLowerCase))
AlterTableDropColumns(alterTableDropColumnModel)
}
+
+ def getFields(schema: Seq[StructField]): Seq[Field] = {
+ schema.map { col =>
+ val x = if (col.dataType.catalogString == "float") {
+ '`' + col.name + '`' + " double"
+ }
+ else {
+ '`' + col.name + '`' + ' ' + col.dataType.catalogString
+ }
+ val f: Field = anyFieldDef(new lexical.Scanner(x.toLowerCase))
+ match {
+ case Success(field, _) => field.asInstanceOf[Field]
+ case failureOrError => throw new MalformedCarbonCommandException(
+ s"Unsupported data type: $col.getType")
+ }
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = getScaleAndPrecision(col.dataType.catalogString)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ if (f.dataType.getOrElse("").startsWith("char")) {
+ f.dataType = Some("char")
+ }
+ else if (f.dataType.getOrElse("").startsWith("float")) {
+ f.dataType = Some("double")
+ }
+ f.rawSchema = x
+ f
+ }
+ }
+
+ def getBucketFields(properties: mutable.Map[String, String],
+ fields: Seq[Field],
+ options: CarbonOption): Option[BucketFields] = {
+ if (!CommonUtil.validateTblProperties(properties,
+ fields)) {
+ throw new MalformedCarbonCommandException("Invalid table properties")
+ }
+ if (options.isBucketingEnabled) {
+ if (options.bucketNumber.toString.contains("-") ||
+ options.bucketNumber.toString.contains("+")) {
+ throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED")
+ }
+ else {
+ Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+ options.bucketNumber))
+ }
+ } else {
+ None
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 02c1366..dc7e608 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -122,58 +122,15 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
// to include the partition columns here explicitly
val schema = cols ++ partitionCols
- val fields = schema.map { col =>
- val x = if (col.dataType.catalogString == "float") {
- '`' + col.name + '`' + " double"
- }
- else {
- '`' + col.name + '`' + ' ' + col.dataType.catalogString
- }
- val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x.toLowerCase))
- match {
- case parser.Success(field, _) => field.asInstanceOf[Field]
- case failureOrError => throw new MalformedCarbonCommandException(
- s"Unsupported data type: $col.getType")
- }
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = parser.getScaleAndPrecision(col.dataType.catalogString)
- f.precision = precision
- f.scale = scale
- f.dataType = Some("decimal")
- }
- if (f.dataType.getOrElse("").startsWith("char")) {
- f.dataType = Some("char")
- }
- else if (f.dataType.getOrElse("").startsWith("float")) {
- f.dataType = Some("double")
- }
- f.rawSchema = x
- f
- }
-
- // validate tblProperties
- if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
- throw new MalformedCarbonCommandException("Invalid table properties")
- }
- val options = new CarbonOption(properties)
- val bucketFields = if (options.isBucketingEnabled) {
- if (options.bucketNumber.toString.contains("-") ||
- options.bucketNumber.toString.contains("+")) {
- throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED")
- }
- else {
- Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
- options.bucketNumber))
- }
- } else {
- None
- }
+ val fields = parser.getFields(schema)
val tableProperties = mutable.Map[String, String]()
properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
+
+ val options = new CarbonOption(properties)
+ // validate tblProperties
+ val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
// prepare table model of the collected tokens
val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
convertDbNameToLowerCase(name.database),
@@ -217,4 +174,5 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
}
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 3d814a1..52e3764 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.carbondata
+import scala.collection.mutable
+
import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
@@ -42,7 +44,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
| stringField string,
| decimalField decimal(13, 0),
| timestampField string)
- | USING org.apache.spark.sql.CarbonSource
+ | USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='carbon_testtable')
""".stripMargin)
sql(
@@ -192,4 +194,67 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
}
}
+ test("test create table with complex datatype") {
+ sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')")
+ sql("drop table create_source")
+ }
+
+ test("test to create bucket columns with int field") {
+ sql("drop table if exists create_source")
+ intercept[Exception] {
+ sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='intField','tableName'='create_source')")
+ }
+ }
+
+ test("test to create bucket columns with complex data type field") {
+ sql("drop table if exists create_source")
+ intercept[Exception] {
+ sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='complexField','tableName'='create_source')")
+ }
+ }
+
+ test("test check results of table with complex data type and bucketing") {
+ sql("drop table if exists create_source")
+ sql("create table create_source(intField int, stringField string, complexField array<int>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='stringField', 'tableName'='create_source')")
+ sql("""insert into create_source values(1,"source","1$2$3")""")
+ checkAnswer(sql("select * from create_source"), Row(1,"source", mutable.WrappedArray.newBuilder[Int].+=(1,2,3)))
+ sql("drop table if exists create_source")
+ }
+
+ test("test create table without tableName in options") {
+ sql("drop table if exists carbon_test")
+ val exception = intercept[RuntimeException] {
+ sql(
+ s"""
+ | CREATE TABLE carbon_test(
+ | stringField string,
+ | intField int)
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS('DICTIONARY_EXCLUDE'='stringField')
+ """.
+ stripMargin
+ )
+ }.getMessage
+ sql("drop table if exists carbon_test")
+ assert(exception.eq("Table creation failed. Table name is not specified"))
+ }
+
+ test("test create table with space in tableName") {
+ sql("drop table if exists carbon_test")
+ val exception = intercept[RuntimeException] {
+ sql(
+ s"""
+ | CREATE TABLE carbon_test(
+ | stringField string,
+ | intField int)
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS('DICTIONARY_EXCLUDE'='stringField', 'tableName'='carbon test')
+ """.
+ stripMargin
+ )
+ }.getMessage
+ sql("drop table if exists carbon_test")
+ assert(exception.eq("Table creation failed. Table name cannot contain blank space"))
+ }
+
}