You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/11 13:53:29 UTC

[39/50] carbondata git commit: fixed complex data type in CarbonSource

fixed complex data type in CarbonSource


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10b9e273
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10b9e273
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10b9e273

Branch: refs/heads/branch-1.1
Commit: 10b9e273112446f9aa4ce760130a8da091a3d719
Parents: 409bd15
Author: kunal642 <ku...@knoldus.in>
Authored: Tue May 9 17:43:23 2017 +0530
Committer: kunal642 <ku...@knoldus.in>
Committed: Tue May 9 20:24:23 2017 +0530

----------------------------------------------------------------------
 .../org/apache/spark/sql/CarbonSource.scala     | 61 +++++-------------
 .../sql/parser/CarbonSpark2SqlParser.scala      | 58 +++++++++++++++++
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 56 ++--------------
 .../carbondata/CarbonDataSourceSuite.scala      | 67 +++++++++++++++++++-
 4 files changed, 148 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 19a278b..1c16143 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DecimalType, StructType}
 
@@ -114,8 +115,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     addLateDecodeOptimization(sqlContext.sparkSession)
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
-
+    val tableOption: Option[String] = parameters.get("tableName")
+    if (tableOption.isEmpty) {
+      sys.error("Table creation failed. Table name is not specified")
+    }
+    val tableName = tableOption.get.toLowerCase()
+    if (tableName.contains(" ")) {
+      sys.error("Table creation failed. Table name cannot contain blank space")
+    }
     val path = if (sqlContext.sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
         getPathForTable(sqlContext.sparkSession, dbName, tableName)
@@ -140,55 +147,21 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
-    if (StringUtils.isBlank(tableName)) {
-      throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
-    }
-    if (tableName.contains(" ")) {
-      throw new MalformedCarbonCommandException("Table Name Should not have spaces ")
-    }
-    val options = new CarbonOption(parameters)
+    val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
     try {
       CarbonEnv.getInstance(sparkSession).carbonMetastore
         .lookupRelation(Option(dbName), tableName)(sparkSession)
       CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
     } catch {
       case ex: NoSuchTableException =>
-        val fields = dataSchema.map { col =>
-          val dataType = Option(col.dataType.toString)
-          // This is to parse complex data types
-          val colName = col.name.toLowerCase
-          val f: Field = Field(colName, dataType, Option(colName), None, null)
-          // the data type of the decimal type will be like decimal(10,0)
-          // so checking the start of the string and taking the precision and scale.
-          // resetting the data type with decimal
-          Option(col.dataType).foreach {
-            case d: DecimalType =>
-              f.precision = d.precision
-              f.scale = d.scale
-              f.dataType = Some("decimal")
-            case _ => // do nothing
-          }
-          f
-        }
+        val sqlParser = new CarbonSpark2SqlParser
+        val fields = sqlParser.getFields(dataSchema)
         val map = scala.collection.mutable.Map[String, String]()
-        parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
-        val bucketFields = if (options.isBucketingEnabled) {
-          if (options.bucketNumber.toString.contains("-") ||
-              options.bucketNumber.toString.contains("+")) {
-            throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
-                                                      options.bucketNumber.toString)
-          }
-          else {
-            Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
-              options.bucketNumber))
-          }
-        } else {
-          None
-        }
-
-        val cm = TableCreator.prepareTableModel(ifNotExistPresent = false, Option(dbName),
-          tableName, fields, Nil, bucketFields, map)
+        parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
+        val options = new CarbonOption(parameters)
+        val bucketFields = sqlParser.getBucketFields(map, fields, options)
+        val cm = sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
+          tableName, fields, Nil, map, bucketFields)
         CreateTable(cm, false).run(sparkSession)
         CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 695f5fc..d1a764f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.sql.parser
 
+import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.spark.sql.ShowLoadsCommand
 import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.types.StructField
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -238,4 +241,59 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           values.map(_.toLowerCase))
         AlterTableDropColumns(alterTableDropColumnModel)
     }
+
+  def getFields(schema: Seq[StructField]): Seq[Field] = {
+    schema.map { col =>
+      val x = if (col.dataType.catalogString == "float") {
+        '`' + col.name + '`' + " double"
+      }
+      else {
+        '`' + col.name + '`' + ' ' + col.dataType.catalogString
+      }
+      val f: Field = anyFieldDef(new lexical.Scanner(x.toLowerCase))
+      match {
+        case Success(field, _) => field.asInstanceOf[Field]
+        case failureOrError => throw new MalformedCarbonCommandException(
+          s"Unsupported data type: $col.getType")
+      }
+      // the data type of the decimal type will be like decimal(10,0)
+      // so checking the start of the string and taking the precision and scale.
+      // resetting the data type with decimal
+      if (f.dataType.getOrElse("").startsWith("decimal")) {
+        val (precision, scale) = getScaleAndPrecision(col.dataType.catalogString)
+        f.precision = precision
+        f.scale = scale
+        f.dataType = Some("decimal")
+      }
+      if (f.dataType.getOrElse("").startsWith("char")) {
+        f.dataType = Some("char")
+      }
+      else if (f.dataType.getOrElse("").startsWith("float")) {
+        f.dataType = Some("double")
+      }
+      f.rawSchema = x
+      f
+    }
+  }
+
+  def getBucketFields(properties: mutable.Map[String, String],
+      fields: Seq[Field],
+      options: CarbonOption): Option[BucketFields] = {
+    if (!CommonUtil.validateTblProperties(properties,
+      fields)) {
+      throw new MalformedCarbonCommandException("Invalid table properties")
+    }
+    if (options.isBucketingEnabled) {
+      if (options.bucketNumber.toString.contains("-") ||
+          options.bucketNumber.toString.contains("+")) {
+        throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED")
+      }
+      else {
+        Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+          options.bucketNumber))
+      }
+    } else {
+      None
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 02c1366..dc7e608 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -122,58 +122,15 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
       // to include the partition columns here explicitly
       val schema = cols ++ partitionCols
 
-      val fields = schema.map { col =>
-        val x = if (col.dataType.catalogString == "float") {
-          '`' + col.name + '`' + " double"
-        }
-        else {
-          '`' + col.name + '`' + ' ' + col.dataType.catalogString
-        }
-        val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x.toLowerCase))
-        match {
-          case parser.Success(field, _) => field.asInstanceOf[Field]
-          case failureOrError => throw new MalformedCarbonCommandException(
-            s"Unsupported data type: $col.getType")
-        }
-        // the data type of the decimal type will be like decimal(10,0)
-        // so checking the start of the string and taking the precision and scale.
-        // resetting the data type with decimal
-        if (f.dataType.getOrElse("").startsWith("decimal")) {
-          val (precision, scale) = parser.getScaleAndPrecision(col.dataType.catalogString)
-          f.precision = precision
-          f.scale = scale
-          f.dataType = Some("decimal")
-        }
-        if (f.dataType.getOrElse("").startsWith("char")) {
-          f.dataType = Some("char")
-        }
-        else if (f.dataType.getOrElse("").startsWith("float")) {
-          f.dataType = Some("double")
-        }
-        f.rawSchema = x
-        f
-      }
-
-      // validate tblProperties
-      if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
-        throw new MalformedCarbonCommandException("Invalid table properties")
-      }
-      val options = new CarbonOption(properties)
-      val bucketFields = if (options.isBucketingEnabled) {
-        if (options.bucketNumber.toString.contains("-") ||
-            options.bucketNumber.toString.contains("+")) {
-          throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED")
-        }
-        else {
-          Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
-            options.bucketNumber))
-        }
-      } else {
-        None
-      }
+      val fields = parser.getFields(schema)
 
       val tableProperties = mutable.Map[String, String]()
       properties.foreach(f => tableProperties.put(f._1, f._2.toLowerCase))
+
+      val options = new CarbonOption(properties)
+      // validate tblProperties
+      val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
       // prepare table model of the collected tokens
       val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
         convertDbNameToLowerCase(name.database),
@@ -217,4 +174,5 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
     }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b9e273/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 3d814a1..52e3764 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.carbondata
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Row, SaveMode}
@@ -42,7 +44,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
          |    stringField string,
          |    decimalField decimal(13, 0),
          |    timestampField string)
-         | USING org.apache.spark.sql.CarbonSource
+         | USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='carbon_testtable')
        """.stripMargin)
 
     sql(
@@ -192,4 +194,67 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test create table with complex datatype") {
+    sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')")
+    sql("drop table create_source")
+  }
+
+  test("test to create bucket columns with int field") {
+    sql("drop table if exists create_source")
+    intercept[Exception] {
+      sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='intField','tableName'='create_source')")
+    }
+  }
+
+  test("test to create bucket columns with complex data type field") {
+    sql("drop table if exists create_source")
+    intercept[Exception] {
+      sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='complexField','tableName'='create_source')")
+    }
+  }
+
+  test("test check results of table with complex data type and bucketing") {
+    sql("drop table if exists create_source")
+    sql("create table create_source(intField int, stringField string, complexField array<int>) USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='1', 'bucketcolumns'='stringField', 'tableName'='create_source')")
+    sql("""insert into create_source values(1,"source","1$2$3")""")
+    checkAnswer(sql("select * from create_source"), Row(1,"source", mutable.WrappedArray.newBuilder[Int].+=(1,2,3)))
+    sql("drop table if exists create_source")
+  }
+
+  test("test create table without tableName in options") {
+    sql("drop table if exists carbon_test")
+    val exception = intercept[RuntimeException] {
+      sql(
+        s"""
+           | CREATE TABLE carbon_test(
+           |    stringField string,
+           |    intField int)
+           | USING org.apache.spark.sql.CarbonSource
+           | OPTIONS('DICTIONARY_EXCLUDE'='stringField')
+      """.
+          stripMargin
+      )
+    }.getMessage
+    sql("drop table if exists carbon_test")
+    assert(exception.eq("Table creation failed. Table name is not specified"))
+  }
+
+  test("test create table with space in tableName") {
+    sql("drop table if exists carbon_test")
+    val exception = intercept[RuntimeException] {
+      sql(
+        s"""
+           | CREATE TABLE carbon_test(
+           |    stringField string,
+           |    intField int)
+           | USING org.apache.spark.sql.CarbonSource
+           | OPTIONS('DICTIONARY_EXCLUDE'='stringField', 'tableName'='carbon test')
+      """.
+          stripMargin
+      )
+    }.getMessage
+    sql("drop table if exists carbon_test")
+    assert(exception.eq("Table creation failed. Table name cannot contain blank space"))
+  }
+
 }