You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/15 11:10:43 UTC

[carbondata] branch master updated: [CARBONDATA-3375] [CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column as range column issue

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1241fee  [CARBONDATA-3375] [CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column as range column issue
1241fee is described below

commit 1241feec97cb4653ec2694290c652bc8bc765d0d
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Wed May 8 18:28:21 2019 +0530

    [CARBONDATA-3375] [CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column as range column issue
    
    Problem1 : When only single data item is present then it will be launched as one single task wich results in one executor getting overloaded.
    
    Solution: When only a single range then we divide the splits and give to different tasks in order to ensure one executor does not overload.
    
    Problem2 : When the range col is given as partitioned by column then compaction is failed because compaction goes to Range Column flow.
    
    Solution: Added a check for Partition Table when range column is present so that it goes through the old flow and compaction passes.
    
    This closes #3210
---
 .../dataload/TestRangeColumnDataLoad.scala         |  25 +++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 168 +++++++++++++--------
 2 files changed, 131 insertions(+), 62 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index ff383f9..5d6730f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -187,6 +187,31 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
     sql("DROP TABLE IF EXISTS carbon_range_column1")
   }
 
+  test("Test compaction for range_column - Partition Column") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING)
+        | PARTITIONED BY (age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='age, city',
+        | 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
   test("Test compaction for range_column - 2 levels") {
     sql("DROP TABLE IF EXISTS carbon_range_column1")
     sql(
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index e361c14..c143f93 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -296,7 +296,11 @@ class CarbonMergerRDD[K, V](
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val rangeColumn = carbonTable.getRangeColumn
+    var rangeColumn: CarbonColumn = null
+    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      // If the table is not a partition table then only we go for range column compaction flow
+      rangeColumn = carbonTable.getRangeColumn
+    }
     val dataType: DataType = if (null != rangeColumn) {
       rangeColumn.getDataType
     } else {
@@ -386,6 +390,7 @@ class CarbonMergerRDD[K, V](
     }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)
+    var singleRange = false
     if (rangeColumn != null) {
       // To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
       val numOfPartitions = Math
@@ -400,10 +405,14 @@ class CarbonMergerRDD[K, V](
         dataType)
       // If RangePartitioner does not give ranges in the case when the data is skewed with
       // a lot of null records then we take the min/max from footer and set them for tasks
-      if (null == allRanges || (allRanges.size == 1 && allRanges(0) == null)) {
+      if (null == allRanges || allRanges.size == 1) {
         allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
           rangeColumn,
           isRangeColSortCol)
+        if(allRanges(0) == allRanges(1)) {
+          // This will be true only if data has single values throughout
+          singleRange = true
+        }
       }
       LOGGER.info(s"Number of ranges:" + allRanges.size)
     }
@@ -433,75 +442,110 @@ class CarbonMergerRDD[K, V](
     val newRanges = allRanges.filter { range =>
       range != null
     }
-    carbonInputSplits.foreach { split =>
-      var dataFileFooter: DataFileFooter = null
-      if (null == rangeColumn) {
-        val taskNo = getTaskNo(split, partitionTaskMap, counter)
-        var sizeOfSplit = split.getDetailInfo.getBlockSize
-        val splitList = taskIdMapping.get(taskNo)
-        noOfBlocks += 1
+    val noOfSplitsPerTask = Math.ceil(carbonInputSplits.size / defaultParallelism)
+    var taskCount = 0
+    // In case of range column if only one data value is present then we try to
+    // divide the splits to different tasks in order to avoid single task creation
+    // and load on single executor
+    if (singleRange) {
+      carbonInputSplits.foreach { split =>
+        var dataFileFooter: DataFileFooter = null
+        try {
+          dataFileFooter = CarbonUtil.readMetadataFile(
+            CarbonInputSplit.getTableBlockInfo(split))
+        } catch {
+          case e: IOException =>
+            logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+            throw e
+        }
+        // add all the column and cardinality to the map
+        CarbonCompactionUtil
+          .addColumnCardinalityToMap(columnToCardinalityMap,
+            dataFileFooter.getColumnInTable,
+            dataFileFooter.getSegmentInfo.getColumnCardinality)
+
+        var splitList = taskIdMapping.get(taskCount.toString)
+        if (null != splitList && splitList.size == noOfSplitsPerTask) {
+          taskCount = taskCount + 1
+          splitList = taskIdMapping.get(taskCount.toString)
+        }
         if (null == splitList) {
-          val splitTempList = new util.ArrayList[CarbonInputSplit]()
-          splitTempList.add(split)
-          taskIdMapping.put(taskNo, splitTempList)
-        } else {
-          splitList.add(split)
+          splitList = new util.ArrayList[CarbonInputSplit]()
+          taskIdMapping.put(taskCount.toString, splitList)
         }
+        splitList.add(split)
       }
-      // Check the cardinality of each columns and set the highest.
-      try {
-        dataFileFooter = CarbonUtil.readMetadataFile(
-          CarbonInputSplit.getTableBlockInfo(split))
-      } catch {
-        case e: IOException =>
-          logError("Exception in preparing the data file footer for compaction " + e.getMessage)
-          throw e
-      }
-      // add all the column and cardinality to the map
-      CarbonCompactionUtil
-        .addColumnCardinalityToMap(columnToCardinalityMap,
-          dataFileFooter.getColumnInTable,
-          dataFileFooter.getSegmentInfo.getColumnCardinality)
-
-      // Create taskIdMapping here for range column by reading min/max values.
-      if (null != rangeColumn) {
-        if (null == expressionMapForRangeCol) {
-          expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
+    } else {
+      carbonInputSplits.foreach { split =>
+        var dataFileFooter: DataFileFooter = null
+        if (null == rangeColumn) {
+          val taskNo = getTaskNo(split, partitionTaskMap, counter)
+          var sizeOfSplit = split.getDetailInfo.getBlockSize
+          val splitList = taskIdMapping.get(taskNo)
+          noOfBlocks += 1
+          if (null == splitList) {
+            val splitTempList = new util.ArrayList[CarbonInputSplit]()
+            splitTempList.add(split)
+            taskIdMapping.put(taskNo, splitTempList)
+          } else {
+            splitList.add(split)
+          }
+        }
+        // Check the cardinality of each columns and set the highest.
+        try {
+          dataFileFooter = CarbonUtil.readMetadataFile(
+            CarbonInputSplit.getTableBlockInfo(split))
+        } catch {
+          case e: IOException =>
+            logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+            throw e
         }
-        if (-1 == indexOfRangeColumn) {
-          val allColumns = dataFileFooter.getColumnInTable
-          for (i <- 0 until allColumns.size()) {
-            if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
-              indexOfRangeColumn = i
+        // add all the column and cardinality to the map
+        CarbonCompactionUtil
+          .addColumnCardinalityToMap(columnToCardinalityMap,
+            dataFileFooter.getColumnInTable,
+            dataFileFooter.getSegmentInfo.getColumnCardinality)
+
+        // Create taskIdMapping here for range column by reading min/max values.
+        if (null != rangeColumn) {
+          if (null == expressionMapForRangeCol) {
+            expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
+          }
+          if (-1 == indexOfRangeColumn) {
+            val allColumns = dataFileFooter.getColumnInTable
+            for (i <- 0 until allColumns.size()) {
+              if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
+                indexOfRangeColumn = i
+              }
             }
           }
-        }
-        // Create ranges and add splits to the tasks
-        for (i <- 0 until (newRanges.size + 1)) {
-          if (null == expressionMapForRangeCol.get(i)) {
-            // Creating FilterExpression for the range column
-            var minVal: Object = null
-            var maxVal: Object = null
-            // For first task we will create an Or Filter and also accomodate null values
-            // For last task we will take as GreaterThan Expression of last value
-            if (i != 0) {
-              minVal = newRanges(i - 1)
+          // Create ranges and add splits to the tasks
+          for (i <- 0 until (newRanges.size + 1)) {
+            if (null == expressionMapForRangeCol.get(i)) {
+              // Creating FilterExpression for the range column
+              var minVal: Object = null
+              var maxVal: Object = null
+              // For first task we will create an Or Filter and also accomodate null values
+              // For last task we will take as GreaterThan Expression of last value
+              if (i != 0) {
+                minVal = newRanges(i - 1)
+              }
+              if (i != newRanges.size) {
+                maxVal = newRanges(i)
+              }
+              val filterExpr = CarbonCompactionUtil
+                .getFilterExpressionForRange(rangeColumn,
+                  minVal, maxVal, dataType)
+              expressionMapForRangeCol.put(i, filterExpr)
             }
-            if (i != newRanges.size) {
-              maxVal = newRanges(i)
+            var splitList = taskIdMapping.get(i.toString)
+            noOfBlocks += 1
+            if (null == splitList) {
+              splitList = new util.ArrayList[CarbonInputSplit]()
+              taskIdMapping.put(i.toString, splitList)
             }
-            val filterExpr = CarbonCompactionUtil
-              .getFilterExpressionForRange(rangeColumn,
-                minVal, maxVal, dataType)
-            expressionMapForRangeCol.put(i, filterExpr)
-          }
-          var splitList = taskIdMapping.get(i.toString)
-          noOfBlocks += 1
-          if (null == splitList) {
-            splitList = new util.ArrayList[CarbonInputSplit]()
-            taskIdMapping.put(i.toString, splitList)
+            splitList.add(split)
           }
-          splitList.add(split)
         }
       }
     }