You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:41 UTC

[45/50] [abbrv] carbondata git commit: [CARBONDATA-1933] Support Spark 2.2.1 in carbon partition tables

[CARBONDATA-1933] Support Spark 2.2.1 in carbon partition tables

This closes #1716


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f0123e79
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f0123e79
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f0123e79

Branch: refs/heads/fgdatamap
Commit: f0123e795b3ad9f5412920fadb453db31754dcfc
Parents: 9a2159b
Author: ravipesala <ra...@gmail.com>
Authored: Fri Dec 22 14:47:53 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 23 21:23:16 2017 +0800

----------------------------------------------------------------------
 .../StandardPartitionTableQueryTestCase.scala   |  44 +++++++-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  29 ++++-
 .../management/CarbonLoadDataCommand.scala      | 109 +++++++++++--------
 .../datasources/CarbonFileFormat.scala          |   7 +-
 4 files changed, 136 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 8670162..b3c91ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -16,9 +16,9 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -63,7 +63,6 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
       "select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
       " deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
       "from partitionone where empno=11 order by empno")
-
     verifyPartitionInfo(frame, Seq("empno=11"))
 
     checkAnswer(frame,
@@ -186,6 +185,43 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
 
   }
 
+  test("badrecords on partition column") {
+    sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition options('bad_records_action'='force')")
+    sql("select count(*) from badrecordsPartition").show()
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is null"), Seq(Row(9)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is not null"), Seq(Row(2)))
+  }
+
+  test("badrecords fail on partition column") {
+    sql("create table badrecordsPartitionfail(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+    intercept[Exception] {
+      sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfail options('bad_records_action'='fail')")
+
+    }
+  }
+
+  test("badrecords ignore on partition column") {
+    sql("create table badrecordsPartitionignore(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionignore options('bad_records_action'='ignore')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), Seq(Row(3)))
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), Seq(Row(2)))
+  }
+
+  test("static column partition with load command") {
+    sql(
+      """
+        | CREATE TABLE staticpartitionload (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
 
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
@@ -209,6 +245,10 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partitionmany")
     sql("drop table if exists partitiondate")
     sql("drop table if exists partitiondateinsert")
+    sql("drop table if exists badrecordsPartition")
+    sql("drop table if exists staticpartitionload")
+    sql("drop table if exists badrecordsPartitionignore")
+    sql("drop table if exists badrecordsPartitionfail")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 4c405a4..9fcb98f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -154,11 +155,24 @@ object CarbonScalaUtil {
     }
   }
 
+  /**
+   * Converts incoming value to UTF8String after converting data as per the data type.
+   * @param value Input value to convert
+   * @param dataType Datatype to convert and then convert to UTF8String
+   * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
+   * @param dateFormat DataFormat to convert incase of DateType datatype
+   * @param serializationNullFormat if this encounters in input data then data will
+   *                                be treated as null
+   * @param fail If it is true then any conversion error will trhow error otherwise it will be
+   *                   filled with ull value
+   * @return converted UTF8String
+   */
   def convertToUTF8String(value: String,
       dataType: DataType,
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
-      serializationNullFormat: String): UTF8String = {
+      serializationNullFormat: String,
+      fail: Boolean): UTF8String = {
     if (value == null || serializationNullFormat.equals(value)) {
       return UTF8String.fromString(value)
     }
@@ -171,11 +185,22 @@ object CarbonScalaUtil {
           UTF8String.fromString(
             DateTimeUtils.dateToString(
               (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
+        case ShortType => UTF8String.fromString(value.toShort.toString)
+        case IntegerType => UTF8String.fromString(value.toInt.toString)
+        case LongType => UTF8String.fromString(value.toLong.toString)
+        case DoubleType => UTF8String.fromString(value.toDouble.toString)
+        case FloatType => UTF8String.fromString(value.toFloat.toString)
+        case d: DecimalType => UTF8String
+          .fromString(new java.math.BigDecimal(value).toPlainString)
+        case BooleanType => UTF8String.fromString(value.toBoolean.toString)
         case _ => UTF8String.fromString(value)
       }
     } catch {
       case e: Exception =>
-        UTF8String.fromString(value)
+        if (fail) {
+          throw e
+        }
+        UTF8String.fromString(null)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5eee0bf..55c8769 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -67,7 +67,7 @@ import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
@@ -463,13 +463,16 @@ case class CarbonLoadDataCommand(
     val logicalPlan =
       sparkSession.sessionState.catalog.lookupRelation(
         identifier)
-    val relation = logicalPlan.collect {
-      case l: LogicalRelation => l
+    val catalogTable: CatalogTable = logicalPlan.collect {
+      case l: LogicalRelation => l.catalogTable.get
       case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
         if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
             c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
             c.getClass.getName.equals(
-              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => c
+              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+        CarbonReflectionUtils.getFieldOfCatalogTable(
+          "tableMeta",
+          c).asInstanceOf[CatalogTable]
     }.head
     // Clean up the old invalid segment data.
     DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
@@ -481,6 +484,11 @@ case class CarbonLoadDataCommand(
     // converted to hive standard fomat to let spark understand the data to partition.
     val serializationNullFormat =
       carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+    val failAction =
+      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+    val ignoreAction =
+      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
     val query: LogicalPlan = if (dataFrame.isDefined) {
       var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
       val timeStampFormat = new SimpleDateFormat(timeStampformatString)
@@ -513,7 +521,7 @@ case class CarbonLoadDataCommand(
         sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
         // In case of update, we don't need the segmrntid column in case of partitioning
         val dropAttributes = attributes.dropRight(1)
-        val finalOutput = relation.output.map { attr =>
+        val finalOutput = catalogTable.schema.map { attr =>
           dropAttributes.find { d =>
             val index = d.name.lastIndexOf("-updatedColumn")
             if (index > 0) {
@@ -548,13 +556,15 @@ case class CarbonLoadDataCommand(
         StructType(carbonLoadModel.getCsvHeaderColumns.map(
           StructField(_, StringType))).toAttributes
       val rowDataTypes = attributes.map { attribute =>
-        relation.output.find(_.name.equalsIgnoreCase(attribute.name)) match {
+        catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
           case Some(attr) => attr.dataType
           case _ => StringType
         }
       }
       val len = rowDataTypes.length
-      val rdd =
+      // Fail row conversion if fail/ignore badrecord action is enabled
+      val fail = failAction || ignoreAction
+      var rdd =
         new NewHadoopRDD[NullWritable, StringArrayWritable](
           sparkSession.sparkContext,
           classOf[CSVInputFormat],
@@ -564,55 +574,60 @@ case class CarbonLoadDataCommand(
             val data = new Array[Any](len)
             var i = 0
             val input = value.get()
-            while (i < input.length) {
-              // TODO find a way to avoid double conversion of date and time.
-              data(i) = CarbonScalaUtil.convertToUTF8String(
-                input(i),
-                rowDataTypes(i),
-                timeStampFormat,
-                dateFormat,
-                serializationNullFormat)
-              i = i + 1
+            val inputLen = Math.min(input.length, len)
+            try {
+              while (i < inputLen) {
+                // TODO find a way to avoid double conversion of date and time.
+                data(i) = CarbonScalaUtil.convertToUTF8String(
+                  input(i),
+                  rowDataTypes(i),
+                  timeStampFormat,
+                  dateFormat,
+                  serializationNullFormat,
+                  fail)
+                i = i + 1
+              }
+              InternalRow.fromSeq(data)
+            } catch {
+              case e: Exception =>
+                if (failAction) {
+                  // It is badrecord fail case.
+                  throw new BadRecordFoundException(
+                    s"Data load failed due to bad record: " +
+                    s"${input(i)} with datatype ${rowDataTypes(i)}")
+                } else {
+                  // It is bad record ignore case
+                  InternalRow.empty
+                }
             }
-            InternalRow.fromSeq(data)
+
         }
+      // In bad record ignore case filter the empty values
+      if (ignoreAction) {
+        rdd = rdd.filter(f => f.numFields != 0)
+      }
 
       // Only select the required columns
       val output = if (partition.nonEmpty) {
-        relation.output.map{ attr =>
+        catalogTable.schema.map{ attr =>
           attributes.find(_.name.equalsIgnoreCase(attr.name)).get
         }.filter(attr => partition.get(attr.name).isEmpty)
       } else {
-        relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+        catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
       }
       Project(output, LogicalRDD(attributes, rdd)(sparkSession))
     }
-    val convertRelation = relation match {
-      case l: LogicalRelation =>
-        convertToLogicalRelation(
-          l.catalogTable.get,
-          l.output,
-          l.relation.sizeInBytes,
-          isOverwriteTable,
-          carbonLoadModel,
-          sparkSession)
-      case others =>
-        val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
-          "tableMeta",
-          relation).asInstanceOf[CatalogTable]
-        // TODO need to find a way to avoid double lookup
-        val sizeInBytes =
-          CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-            catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
-        val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
-        convertToLogicalRelation(
-          catalogTable,
-          others.output,
-          sizeInBytes,
-          isOverwriteTable,
-          carbonLoadModel,
-          sparkSession)
-    }
+    // TODO need to find a way to avoid double lookup
+    val sizeInBytes =
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+        catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+    val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
+    val convertRelation = convertToLogicalRelation(
+      catalogTable,
+      sizeInBytes,
+      isOverwriteTable,
+      carbonLoadModel,
+      sparkSession)
     val convertedPlan =
       CarbonReflectionUtils.getInsertIntoCommand(
         convertRelation,
@@ -629,14 +644,12 @@ case class CarbonLoadDataCommand(
 
   private def convertToLogicalRelation(
       catalogTable: CatalogTable,
-      output: Seq[Attribute],
       sizeInBytes: Long,
       overWrite: Boolean,
       loadModel: CarbonLoadModel,
       sparkSession: SparkSession): LogicalRelation = {
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metastoreSchema = StructType(StructType.fromAttributes(
-      output).fields.map(_.copy(dataType = StringType)))
+    val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType)))
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val catalog = new CatalogFileIndex(
       sparkSession, catalogTable, sizeInBytes)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index b9df7a1..4b338a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -192,7 +192,12 @@ private class CarbonOutputWriter(path: String,
     partitions.map{ p =>
       val splitData = p.split("=")
       if (splitData.length > 1) {
-        splitData(1)
+        // NUll handling case. For null hive creates with this special name
+        if (splitData(1).equals("__HIVE_DEFAULT_PARTITION__")) {
+          null
+        } else {
+          splitData(1)
+        }
       } else {
         ""
       }