You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 19:05:54 UTC

[carbondata] 21/22: [CARBONDATA-3377] Fix for Null pointer exception in Range Col compaction

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 4abed04bfefe7a24f18ab42fd96d63a617a26596
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Fri May 10 15:43:10 2019 +0530

    [CARBONDATA-3377] Fix for Null pointer exception in Range Col compaction
    
    Problem : String Type Column with huge strings and null values fails giving NullPointerException when it is a range column and compaction is done.
    
    Solution : Added a check in StringOrdering for null values.
    
    This closes #3212
---
 .../core/constants/CarbonCommonConstants.java      |  4 +++
 .../carbondata/core/util/CarbonProperties.java     |  6 ++++
 .../dataload/TestRangeColumnDataLoad.scala         | 42 +++++++++++++++++++++-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 16 ++++++---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  7 ++--
 5 files changed, 67 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ba8e20a..43544cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1193,6 +1193,10 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
 
+  public static final String CARBON_ENABLE_RANGE_COMPACTION = "carbon.enable.range.compaction";
+
+  public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "false";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Query parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 004a51e..e26f3d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1507,6 +1507,12 @@ public final class CarbonProperties {
     return Boolean.parseBoolean(pushFilters);
   }
 
+  public boolean isRangeCompactionAllowed() {
+    String isRangeCompact = getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
+        CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
+    return Boolean.parseBoolean(isRangeCompact);
+  }
+
   private void validateSortMemorySpillPercentage() {
     String spillPercentageStr = carbonProperties.getProperty(
         CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 5d6730f..165e4f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -610,6 +610,34 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
     sql("DROP TABLE IF EXISTS carbon_range_column1")
   }
 
+  test("Test compaction for range_column - STRING Datatype null values") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    deleteFile(filePath2)
+    createFile(filePath2, 20, 14)
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='city',
+        | 'range_column'='city')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('BAD_RECORDS_ACTION'='FORCE','HEADER'='false')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('BAD_RECORDS_ACTION'='FORCE','HEADER'='false')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    deleteFile(filePath2)
+  }
+
   test("Test compaction for range_column - STRING Datatype min/max not stored") {
     deleteFile(filePath2)
     createFile(filePath2, 1000, 7)
@@ -930,12 +958,24 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
             .println(
               100 + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
         }
-      } else if (9 <= lastCol) {
+      } else if (9 <= lastCol && 13 >= lastCol) {
         for (i <- lastCol until (lastCol + line)) {
           write
             .println(
               i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
         }
+      } else if (14 == lastCol) {
+        // Null data generation for string col
+        for (i <- lastCol until (lastCol + line)) {
+          if (i % 3 != 0) {
+            write
+              .println(
+                i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+          } else {
+            write
+              .println(i + ",")
+          }
+        }
       }
       write.close()
     } catch {
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index a751887..81699b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -311,7 +311,7 @@ object DataLoadProcessBuilderOnSpark {
         // better to generate a CarbonData file for each partition
         val totalSize = model.getTotalSize.toDouble
         val table = model.getCarbonDataLoadSchema.getCarbonTable
-        numPartitions = getNumPatitionsBasedOnSize(totalSize, table, model)
+        numPartitions = getNumPatitionsBasedOnSize(totalSize, table, model, false)
       }
     }
     numPartitions
@@ -319,10 +319,13 @@ object DataLoadProcessBuilderOnSpark {
 
   def getNumPatitionsBasedOnSize(totalSize: Double,
       table: CarbonTable,
-      model: CarbonLoadModel): Int = {
+      model: CarbonLoadModel,
+      mergerFlag: Boolean): Int = {
     val blockSize = 1024L * 1024 * table.getBlockSizeInMB
     val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
-    val scaleFactor = if (model.getScaleFactor == 0) {
+    val scaleFactor = if (mergerFlag) {
+      1
+    } else if (model.getScaleFactor == 0) {
       // use system properties
       CarbonProperties.getInstance().getRangeColumnScaleFactor
     } else {
@@ -385,6 +388,11 @@ class ByteArrayOrdering() extends Ordering[Object] {
 
 class StringOrdering() extends Ordering[Object] {
   override def compare(x: Object, y: Object): Int = {
-    (x.asInstanceOf[UTF8String]).compare(y.asInstanceOf[UTF8String])
+    if (x == null) {
+      return -1
+    } else if (y == null) {
+      return 1
+    }
+    return (x.asInstanceOf[UTF8String]).compare(y.asInstanceOf[UTF8String])
   }
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c143f93..4f4386b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.core.scan.expression
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -297,7 +297,8 @@ class CarbonMergerRDD[K, V](
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var rangeColumn: CarbonColumn = null
-    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+    if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
+        !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
       // If the table is not a partition table then only we go for range column compaction flow
       rangeColumn = carbonTable.getRangeColumn
     }
@@ -395,7 +396,7 @@ class CarbonMergerRDD[K, V](
       // To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
       val numOfPartitions = Math
         .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
-          .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel))
+          .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, true))
       val colName = rangeColumn.getColName
       LOGGER.info(s"Compacting on range column: $colName")
       allRanges = getRangesFromRDD(rangeColumn,