You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/15 13:51:49 UTC
[1/2] carbondata git commit: fix NPE for partition loading
Repository: carbondata
Updated Branches:
refs/heads/master 9a11440cc -> 48cd1f679
fix NPE for partition loading
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e21debe7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e21debe7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e21debe7
Branch: refs/heads/master
Commit: e21debe79ded0e6e166b6182278e17148e2e8f5d
Parents: 9a11440
Author: QiangCai <da...@gmail.com>
Authored: Fri Jun 9 17:14:23 2017 +0800
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Jun 15 19:19:35 2017 +0530
----------------------------------------------------------------------
.../resources/data_partition_badrecords.csv | 12 ++++++++
.../TestDataLoadingForPartitionTable.scala | 13 +++++++-
.../spark/rdd/CarbonDataRDDFactory.scala | 32 +++++++++++++++-----
.../spark/rdd/CarbonDataRDDFactory.scala | 32 +++++++++++++++-----
4 files changed, 74 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv b/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
new file mode 100644
index 0000000..a6dc7f9
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/data_partition_badrecords.csv
@@ -0,0 +1,12 @@
+intField1, stringField1, intField2
+
+,
+,,
+1,
+2,b
+3,c,13
+4,d,14,d
+5,e,
+6,f, ,16
+7,g,g
+8,h,h,18
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 8a35558..b5858b4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,13 +18,13 @@ package org.apache.carbondata.spark.testsuite.partition
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.Row
class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
@@ -281,6 +281,16 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
}
+ test("badrecords on partition column") {
+ sql("create table badrecordsPartition(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata' tblproperties('partition_type'='hash', 'num_partitions'='5')")
+ sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartition options('bad_records_action'='force')")
+
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 = 13"), Seq(Row(1)))
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 = 14"), Seq(Row(1)))
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 is null"), Seq(Row(9)))
+ checkAnswer(sql("select count(*) cnt from badrecordsPartition where intField2 is not null"), Seq(Row(2)))
+ }
+
override def afterAll = {
dropTable
if (defaultTimestampFormat == null) {
@@ -306,6 +316,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
sql("drop table if exists multiInserts")
sql("drop table if exists loadAndInsert")
sql("drop table if exists listTableUpper")
+ sql("drop table if exists badrecordsPartition")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index de0fd5a..3dcf8af 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1001,9 +1001,12 @@ object CarbonDataRDDFactory {
val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
val columns = carbonLoadModel.getCsvHeaderColumns
var partitionColumnIndex = -1
- for (i <- 0 until columns.length) {
- if (partitionColumn.equalsIgnoreCase(columns(i))) {
- partitionColumnIndex = i
+ breakable {
+ for (i <- 0 until columns.length) {
+ if (partitionColumn.equalsIgnoreCase(columns(i))) {
+ partitionColumnIndex = i
+ break
+ }
}
}
if (partitionColumnIndex == -1) {
@@ -1023,8 +1026,13 @@ object CarbonDataRDDFactory {
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
- (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ if (null != row && row.length > partitionColumnIndex &&
+ null != row.get(partitionColumnIndex)) {
+ (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ } else {
+ (null, row)
+ }
}
} else {
// input data from csv files
@@ -1039,8 +1047,18 @@ object CarbonDataRDDFactory {
classOf[StringArrayWritable],
hadoopConfiguration
).map { currentRow =>
- val row = new StringArrayRow(new Array[String](columnCount))
- (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ if (null == currentRow || null == currentRow._2) {
+ val row = new StringArrayRow(new Array[String](columnCount))
+ (null, row)
+ } else {
+ val row = new StringArrayRow(new Array[String](columnCount))
+ val values = currentRow._2.get()
+ if (values != null && values.length > partitionColumnIndex) {
+ (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ } else {
+ (null, row.setValues(currentRow._2.get()))
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e21debe7/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8871f3b..271b56b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1025,9 +1025,12 @@ object CarbonDataRDDFactory {
val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
val columns = carbonLoadModel.getCsvHeaderColumns
var partitionColumnIndex = -1
- for (i <- 0 until columns.length) {
- if (partitionColumn.equalsIgnoreCase(columns(i))) {
- partitionColumnIndex = i
+ breakable {
+ for (i <- 0 until columns.length) {
+ if (partitionColumn.equalsIgnoreCase(columns(i))) {
+ partitionColumnIndex = i
+ break
+ }
}
}
if (partitionColumnIndex == -1) {
@@ -1047,8 +1050,13 @@ object CarbonDataRDDFactory {
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
- (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ if (null != row && row.length > partitionColumnIndex &&
+ null != row.get(partitionColumnIndex)) {
+ (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ } else {
+ (null, row)
+ }
}
} else {
// input data from csv files
@@ -1063,8 +1071,18 @@ object CarbonDataRDDFactory {
classOf[StringArrayWritable],
hadoopConfiguration
).map { currentRow =>
- val row = new StringArrayRow(new Array[String](columnCount))
- (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ if (null == currentRow || null == currentRow._2) {
+ val row = new StringArrayRow(new Array[String](columnCount))
+ (null, row)
+ } else {
+ val row = new StringArrayRow(new Array[String](columnCount))
+ val values = currentRow._2.get()
+ if (values != null && values.length > partitionColumnIndex) {
+ (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+ } else {
+ (null, row.setValues(currentRow._2.get()))
+ }
+ }
}
}
[2/2] carbondata git commit: [CARBONDATA-1178] Data loading with bad
records is throwing NPE on partitioned table. This closes #1036
Posted by gv...@apache.org.
[CARBONDATA-1178] Data loading with bad records is throwing NPE on partitioned table. This closes #1036
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/48cd1f67
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/48cd1f67
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/48cd1f67
Branch: refs/heads/master
Commit: 48cd1f679a529af07f6f38a4cdf3174c917be227
Parents: 9a11440 e21debe
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Thu Jun 15 19:21:16 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Jun 15 19:21:16 2017 +0530
----------------------------------------------------------------------
.../resources/data_partition_badrecords.csv | 12 ++++++++
.../TestDataLoadingForPartitionTable.scala | 13 +++++++-
.../spark/rdd/CarbonDataRDDFactory.scala | 32 +++++++++++++++-----
.../spark/rdd/CarbonDataRDDFactory.scala | 32 +++++++++++++++-----
4 files changed, 74 insertions(+), 15 deletions(-)
----------------------------------------------------------------------