You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 18:54:51 UTC
[carbondata] branch master updated: [CARBONDATA-3377] Fix for Null
pointer exception in Range Col compaction
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 9932a6d [CARBONDATA-3377] Fix for Null pointer exception in Range Col compaction
9932a6d is described below
commit 9932a6d768c2f6bff70ac861d20a403cecc640b5
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Fri May 10 15:43:10 2019 +0530
[CARBONDATA-3377] Fix for Null pointer exception in Range Col compaction
Problem : String Type Column with huge strings and null values fails giving NullPointerException when it is a range column and compaction is done.
Solution : Added a check in StringOrdering for null values.
This closes #3212
---
.../core/constants/CarbonCommonConstants.java | 4 +++
.../carbondata/core/util/CarbonProperties.java | 6 ++++
.../dataload/TestRangeColumnDataLoad.scala | 42 +++++++++++++++++++++-
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 16 ++++++---
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 7 ++--
5 files changed, 67 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ba8e20a..43544cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1193,6 +1193,10 @@ public final class CarbonCommonConstants {
public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
+ public static final String CARBON_ENABLE_RANGE_COMPACTION = "carbon.enable.range.compaction";
+
+ public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "false";
+
//////////////////////////////////////////////////////////////////////////////////////////
// Query parameter start here
//////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 004a51e..e26f3d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1507,6 +1507,12 @@ public final class CarbonProperties {
return Boolean.parseBoolean(pushFilters);
}
+ public boolean isRangeCompactionAllowed() {
+ String isRangeCompact = getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
+ CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
+ return Boolean.parseBoolean(isRangeCompact);
+ }
+
private void validateSortMemorySpillPercentage() {
String spillPercentageStr = carbonProperties.getProperty(
CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 5d6730f..165e4f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -610,6 +610,34 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
sql("DROP TABLE IF EXISTS carbon_range_column1")
}
+ test("Test compaction for range_column - STRING Datatype null values") {
+ sql("DROP TABLE IF EXISTS carbon_range_column1")
+ deleteFile(filePath2)
+ createFile(filePath2, 20, 14)
+ sql(
+ """
+ | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='city',
+ | 'range_column'='city')
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+ "OPTIONS('BAD_RECORDS_ACTION'='FORCE','HEADER'='false')")
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+ "OPTIONS('BAD_RECORDS_ACTION'='FORCE','HEADER'='false')")
+
+ var res = sql("select * from carbon_range_column1").collect()
+
+ sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+ checkAnswer(sql("select * from carbon_range_column1"), res)
+
+ sql("DROP TABLE IF EXISTS carbon_range_column1")
+ deleteFile(filePath2)
+ }
+
test("Test compaction for range_column - STRING Datatype min/max not stored") {
deleteFile(filePath2)
createFile(filePath2, 1000, 7)
@@ -930,12 +958,24 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
.println(
100 + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
}
- } else if (9 <= lastCol) {
+ } else if (9 <= lastCol && 13 >= lastCol) {
for (i <- lastCol until (lastCol + line)) {
write
.println(
i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
}
+ } else if (14 == lastCol) {
+ // Null data generation for string col
+ for (i <- lastCol until (lastCol + line)) {
+ if (i % 3 != 0) {
+ write
+ .println(
+ i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+ } else {
+ write
+ .println(i + ",")
+ }
+ }
}
write.close()
} catch {
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index a751887..81699b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -311,7 +311,7 @@ object DataLoadProcessBuilderOnSpark {
// better to generate a CarbonData file for each partition
val totalSize = model.getTotalSize.toDouble
val table = model.getCarbonDataLoadSchema.getCarbonTable
- numPartitions = getNumPatitionsBasedOnSize(totalSize, table, model)
+ numPartitions = getNumPatitionsBasedOnSize(totalSize, table, model, false)
}
}
numPartitions
@@ -319,10 +319,13 @@ object DataLoadProcessBuilderOnSpark {
def getNumPatitionsBasedOnSize(totalSize: Double,
table: CarbonTable,
- model: CarbonLoadModel): Int = {
+ model: CarbonLoadModel,
+ mergerFlag: Boolean): Int = {
val blockSize = 1024L * 1024 * table.getBlockSizeInMB
val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
- val scaleFactor = if (model.getScaleFactor == 0) {
+ val scaleFactor = if (mergerFlag) {
+ 1
+ } else if (model.getScaleFactor == 0) {
// use system properties
CarbonProperties.getInstance().getRangeColumnScaleFactor
} else {
@@ -385,6 +388,11 @@ class ByteArrayOrdering() extends Ordering[Object] {
class StringOrdering() extends Ordering[Object] {
override def compare(x: Object, y: Object): Int = {
- (x.asInstanceOf[UTF8String]).compare(y.asInstanceOf[UTF8String])
+ if (x == null) {
+ return -1
+ } else if (y == null) {
+ return 1
+ }
+ return (x.asInstanceOf[UTF8String]).compare(y.asInstanceOf[UTF8String])
}
}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c143f93..4f4386b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.core.scan.expression
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -297,7 +297,8 @@ class CarbonMergerRDD[K, V](
)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
var rangeColumn: CarbonColumn = null
- if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+ if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
+ !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
// If the table is not a partition table then only we go for range column compaction flow
rangeColumn = carbonTable.getRangeColumn
}
@@ -395,7 +396,7 @@ class CarbonMergerRDD[K, V](
// To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
val numOfPartitions = Math
.max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
- .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel))
+ .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, true))
val colName = rangeColumn.getColName
LOGGER.info(s"Compacting on range column: $colName")
allRanges = getRangesFromRDD(rangeColumn,