You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:41 UTC
[45/50] [abbrv] carbondata git commit: [CARBONDATA-1933] Support
Spark 2.2.1 in carbon partition tables
[CARBONDATA-1933] Support Spark 2.2.1 in carbon partition tables
This closes #1716
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f0123e79
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f0123e79
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f0123e79
Branch: refs/heads/fgdatamap
Commit: f0123e795b3ad9f5412920fadb453db31754dcfc
Parents: 9a2159b
Author: ravipesala <ra...@gmail.com>
Authored: Fri Dec 22 14:47:53 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 23 21:23:16 2017 +0800
----------------------------------------------------------------------
.../StandardPartitionTableQueryTestCase.scala | 44 +++++++-
.../carbondata/spark/util/CarbonScalaUtil.scala | 29 ++++-
.../management/CarbonLoadDataCommand.scala | 109 +++++++++++--------
.../datasources/CarbonFileFormat.scala | 7 +-
4 files changed, 136 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 8670162..b3c91ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -16,9 +16,9 @@
*/
package org.apache.carbondata.spark.testsuite.standardpartition
-import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.BatchedDataSourceScanExec
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -63,7 +63,6 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
"select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno," +
" deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary " +
"from partitionone where empno=11 order by empno")
-
verifyPartitionInfo(frame, Seq("empno=11"))
checkAnswer(frame,
@@ -186,6 +185,43 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
}
+ test("badrecords on partition column") {
+ sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+ sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition options('bad_records_action'='force')")
+ sql("select count(*) from badrecordsPartition").show()
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is null"), Seq(Row(9)))
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intfield2 is not null"), Seq(Row(2)))
+ }
+
+ test("badrecords fail on partition column") {
+ sql("create table badrecordsPartitionfail(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+ intercept[Exception] {
+ sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionfail options('bad_records_action'='fail')")
+
+ }
+ }
+
+ test("badrecords ignore on partition column") {
+ sql("create table badrecordsPartitionignore(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'")
+ sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionignore options('bad_records_action'='ignore')")
+ checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is null"), Seq(Row(3)))
+ checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), Seq(Row(2)))
+ }
+
+ test("static column partition with load command") {
+ sql(
+ """
+ | CREATE TABLE staticpartitionload (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp,attendance int,
+ | deptname String,projectcode int,
+ | utilization int,salary int,projectenddate Date,doj Timestamp)
+ | PARTITIONED BY (empname String)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ }
+
private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
val plan = frame.queryExecution.sparkPlan
@@ -209,6 +245,10 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
sql("drop table if exists partitionmany")
sql("drop table if exists partitiondate")
sql("drop table if exists partitiondateinsert")
+ sql("drop table if exists badrecordsPartition")
+ sql("drop table if exists staticpartitionload")
+ sql("drop table if exists badrecordsPartitionignore")
+ sql("drop table if exists badrecordsPartitionfail")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 4c405a4..9fcb98f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException
object CarbonScalaUtil {
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -154,11 +155,24 @@ object CarbonScalaUtil {
}
}
+ /**
+ * Converts incoming value to UTF8String after converting data as per the data type.
+ * @param value Input value to convert
+ * @param dataType Datatype to convert and then convert to UTF8String
+ * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
+ * @param dateFormat DataFormat to convert incase of DateType datatype
+ * @param serializationNullFormat if this encounters in input data then data will
+ * be treated as null
+ * @param fail If it is true then any conversion error will trhow error otherwise it will be
+ * filled with ull value
+ * @return converted UTF8String
+ */
def convertToUTF8String(value: String,
dataType: DataType,
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
- serializationNullFormat: String): UTF8String = {
+ serializationNullFormat: String,
+ fail: Boolean): UTF8String = {
if (value == null || serializationNullFormat.equals(value)) {
return UTF8String.fromString(value)
}
@@ -171,11 +185,22 @@ object CarbonScalaUtil {
UTF8String.fromString(
DateTimeUtils.dateToString(
(dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
+ case ShortType => UTF8String.fromString(value.toShort.toString)
+ case IntegerType => UTF8String.fromString(value.toInt.toString)
+ case LongType => UTF8String.fromString(value.toLong.toString)
+ case DoubleType => UTF8String.fromString(value.toDouble.toString)
+ case FloatType => UTF8String.fromString(value.toFloat.toString)
+ case d: DecimalType => UTF8String
+ .fromString(new java.math.BigDecimal(value).toPlainString)
+ case BooleanType => UTF8String.fromString(value.toBoolean.toString)
case _ => UTF8String.fromString(value)
}
} catch {
case e: Exception =>
- UTF8String.fromString(value)
+ if (fail) {
+ throw e
+ }
+ UTF8String.fromString(null)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5eee0bf..55c8769 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -67,7 +67,7 @@ import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
@@ -463,13 +463,16 @@ case class CarbonLoadDataCommand(
val logicalPlan =
sparkSession.sessionState.catalog.lookupRelation(
identifier)
- val relation = logicalPlan.collect {
- case l: LogicalRelation => l
+ val catalogTable: CatalogTable = logicalPlan.collect {
+ case l: LogicalRelation => l.catalogTable.get
case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
c.getClass.getName.equals(
- "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => c
+ "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+ CarbonReflectionUtils.getFieldOfCatalogTable(
+ "tableMeta",
+ c).asInstanceOf[CatalogTable]
}.head
// Clean up the old invalid segment data.
DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
@@ -481,6 +484,11 @@ case class CarbonLoadDataCommand(
// converted to hive standard fomat to let spark understand the data to partition.
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ val failAction =
+ carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+ val ignoreAction =
+ carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
val query: LogicalPlan = if (dataFrame.isDefined) {
var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
val timeStampFormat = new SimpleDateFormat(timeStampformatString)
@@ -513,7 +521,7 @@ case class CarbonLoadDataCommand(
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
// In case of update, we don't need the segmrntid column in case of partitioning
val dropAttributes = attributes.dropRight(1)
- val finalOutput = relation.output.map { attr =>
+ val finalOutput = catalogTable.schema.map { attr =>
dropAttributes.find { d =>
val index = d.name.lastIndexOf("-updatedColumn")
if (index > 0) {
@@ -548,13 +556,15 @@ case class CarbonLoadDataCommand(
StructType(carbonLoadModel.getCsvHeaderColumns.map(
StructField(_, StringType))).toAttributes
val rowDataTypes = attributes.map { attribute =>
- relation.output.find(_.name.equalsIgnoreCase(attribute.name)) match {
+ catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
case Some(attr) => attr.dataType
case _ => StringType
}
}
val len = rowDataTypes.length
- val rdd =
+ // Fail row conversion if fail/ignore badrecord action is enabled
+ val fail = failAction || ignoreAction
+ var rdd =
new NewHadoopRDD[NullWritable, StringArrayWritable](
sparkSession.sparkContext,
classOf[CSVInputFormat],
@@ -564,55 +574,60 @@ case class CarbonLoadDataCommand(
val data = new Array[Any](len)
var i = 0
val input = value.get()
- while (i < input.length) {
- // TODO find a way to avoid double conversion of date and time.
- data(i) = CarbonScalaUtil.convertToUTF8String(
- input(i),
- rowDataTypes(i),
- timeStampFormat,
- dateFormat,
- serializationNullFormat)
- i = i + 1
+ val inputLen = Math.min(input.length, len)
+ try {
+ while (i < inputLen) {
+ // TODO find a way to avoid double conversion of date and time.
+ data(i) = CarbonScalaUtil.convertToUTF8String(
+ input(i),
+ rowDataTypes(i),
+ timeStampFormat,
+ dateFormat,
+ serializationNullFormat,
+ fail)
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
+ } catch {
+ case e: Exception =>
+ if (failAction) {
+ // It is badrecord fail case.
+ throw new BadRecordFoundException(
+ s"Data load failed due to bad record: " +
+ s"${input(i)} with datatype ${rowDataTypes(i)}")
+ } else {
+ // It is bad record ignore case
+ InternalRow.empty
+ }
}
- InternalRow.fromSeq(data)
+
}
+ // In bad record ignore case filter the empty values
+ if (ignoreAction) {
+ rdd = rdd.filter(f => f.numFields != 0)
+ }
// Only select the required columns
val output = if (partition.nonEmpty) {
- relation.output.map{ attr =>
+ catalogTable.schema.map{ attr =>
attributes.find(_.name.equalsIgnoreCase(attr.name)).get
}.filter(attr => partition.get(attr.name).isEmpty)
} else {
- relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+ catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
}
Project(output, LogicalRDD(attributes, rdd)(sparkSession))
}
- val convertRelation = relation match {
- case l: LogicalRelation =>
- convertToLogicalRelation(
- l.catalogTable.get,
- l.output,
- l.relation.sizeInBytes,
- isOverwriteTable,
- carbonLoadModel,
- sparkSession)
- case others =>
- val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
- "tableMeta",
- relation).asInstanceOf[CatalogTable]
- // TODO need to find a way to avoid double lookup
- val sizeInBytes =
- CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
- catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
- val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
- convertToLogicalRelation(
- catalogTable,
- others.output,
- sizeInBytes,
- isOverwriteTable,
- carbonLoadModel,
- sparkSession)
- }
+ // TODO need to find a way to avoid double lookup
+ val sizeInBytes =
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+ catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+ val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
+ val convertRelation = convertToLogicalRelation(
+ catalogTable,
+ sizeInBytes,
+ isOverwriteTable,
+ carbonLoadModel,
+ sparkSession)
val convertedPlan =
CarbonReflectionUtils.getInsertIntoCommand(
convertRelation,
@@ -629,14 +644,12 @@ case class CarbonLoadDataCommand(
private def convertToLogicalRelation(
catalogTable: CatalogTable,
- output: Seq[Attribute],
sizeInBytes: Long,
overWrite: Boolean,
loadModel: CarbonLoadModel,
sparkSession: SparkSession): LogicalRelation = {
val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
- val metastoreSchema = StructType(StructType.fromAttributes(
- output).fields.map(_.copy(dataType = StringType)))
+ val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType)))
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val catalog = new CatalogFileIndex(
sparkSession, catalogTable, sizeInBytes)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0123e79/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index b9df7a1..4b338a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -192,7 +192,12 @@ private class CarbonOutputWriter(path: String,
partitions.map{ p =>
val splitData = p.split("=")
if (splitData.length > 1) {
- splitData(1)
+ // NUll handling case. For null hive creates with this special name
+ if (splitData(1).equals("__HIVE_DEFAULT_PARTITION__")) {
+ null
+ } else {
+ splitData(1)
+ }
} else {
""
}