You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/15 11:10:43 UTC
[carbondata] branch master updated: [CARBONDATA-3375]
[CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column
as range column issue
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1241fee [CARBONDATA-3375] [CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column as range column issue
1241fee is described below
commit 1241feec97cb4653ec2694290c652bc8bc765d0d
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Wed May 8 18:28:21 2019 +0530
[CARBONDATA-3375] [CARBONDATA-3376] Fix GC Overhead limit exceeded issue and partition column as range column issue
Problem1 : When only single data item is present then it will be launched as one single task wich results in one executor getting overloaded.
Solution: When only a single range then we divide the splits and give to different tasks in order to ensure one executor does not overload.
Problem2 : When the range col is given as partitioned by column then compaction is failed because compaction goes to Range Column flow.
Solution: Added a check for Partition Table when range column is present so that it goes through the old flow and compaction passes.
This closes #3210
---
.../dataload/TestRangeColumnDataLoad.scala | 25 +++
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 168 +++++++++++++--------
2 files changed, 131 insertions(+), 62 deletions(-)
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index ff383f9..5d6730f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -187,6 +187,31 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
sql("DROP TABLE IF EXISTS carbon_range_column1")
}
+ test("Test compaction for range_column - Partition Column") {
+ sql("DROP TABLE IF EXISTS carbon_range_column1")
+ sql(
+ """
+ | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING)
+ | PARTITIONED BY (age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='age, city',
+ | 'range_column'='age')
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+ "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+ "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+ var res = sql("select * from carbon_range_column1").collect()
+
+ sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+ checkAnswer(sql("select * from carbon_range_column1"), res)
+ sql("DROP TABLE IF EXISTS carbon_range_column1")
+ }
+
test("Test compaction for range_column - 2 levels") {
sql("DROP TABLE IF EXISTS carbon_range_column1")
sql(
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index e361c14..c143f93 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -296,7 +296,11 @@ class CarbonMergerRDD[K, V](
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val rangeColumn = carbonTable.getRangeColumn
+ var rangeColumn: CarbonColumn = null
+ if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+ // If the table is not a partition table then only we go for range column compaction flow
+ rangeColumn = carbonTable.getRangeColumn
+ }
val dataType: DataType = if (null != rangeColumn) {
rangeColumn.getDataType
} else {
@@ -386,6 +390,7 @@ class CarbonMergerRDD[K, V](
}
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)
+ var singleRange = false
if (rangeColumn != null) {
// To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
val numOfPartitions = Math
@@ -400,10 +405,14 @@ class CarbonMergerRDD[K, V](
dataType)
// If RangePartitioner does not give ranges in the case when the data is skewed with
// a lot of null records then we take the min/max from footer and set them for tasks
- if (null == allRanges || (allRanges.size == 1 && allRanges(0) == null)) {
+ if (null == allRanges || allRanges.size == 1) {
allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
rangeColumn,
isRangeColSortCol)
+ if(allRanges(0) == allRanges(1)) {
+ // This will be true only if data has single values throughout
+ singleRange = true
+ }
}
LOGGER.info(s"Number of ranges:" + allRanges.size)
}
@@ -433,75 +442,110 @@ class CarbonMergerRDD[K, V](
val newRanges = allRanges.filter { range =>
range != null
}
- carbonInputSplits.foreach { split =>
- var dataFileFooter: DataFileFooter = null
- if (null == rangeColumn) {
- val taskNo = getTaskNo(split, partitionTaskMap, counter)
- var sizeOfSplit = split.getDetailInfo.getBlockSize
- val splitList = taskIdMapping.get(taskNo)
- noOfBlocks += 1
+ val noOfSplitsPerTask = Math.ceil(carbonInputSplits.size / defaultParallelism)
+ var taskCount = 0
+ // In case of range column if only one data value is present then we try to
+ // divide the splits to different tasks in order to avoid single task creation
+ // and load on single executor
+ if (singleRange) {
+ carbonInputSplits.foreach { split =>
+ var dataFileFooter: DataFileFooter = null
+ try {
+ dataFileFooter = CarbonUtil.readMetadataFile(
+ CarbonInputSplit.getTableBlockInfo(split))
+ } catch {
+ case e: IOException =>
+ logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+ throw e
+ }
+ // add all the column and cardinality to the map
+ CarbonCompactionUtil
+ .addColumnCardinalityToMap(columnToCardinalityMap,
+ dataFileFooter.getColumnInTable,
+ dataFileFooter.getSegmentInfo.getColumnCardinality)
+
+ var splitList = taskIdMapping.get(taskCount.toString)
+ if (null != splitList && splitList.size == noOfSplitsPerTask) {
+ taskCount = taskCount + 1
+ splitList = taskIdMapping.get(taskCount.toString)
+ }
if (null == splitList) {
- val splitTempList = new util.ArrayList[CarbonInputSplit]()
- splitTempList.add(split)
- taskIdMapping.put(taskNo, splitTempList)
- } else {
- splitList.add(split)
+ splitList = new util.ArrayList[CarbonInputSplit]()
+ taskIdMapping.put(taskCount.toString, splitList)
}
+ splitList.add(split)
}
- // Check the cardinality of each columns and set the highest.
- try {
- dataFileFooter = CarbonUtil.readMetadataFile(
- CarbonInputSplit.getTableBlockInfo(split))
- } catch {
- case e: IOException =>
- logError("Exception in preparing the data file footer for compaction " + e.getMessage)
- throw e
- }
- // add all the column and cardinality to the map
- CarbonCompactionUtil
- .addColumnCardinalityToMap(columnToCardinalityMap,
- dataFileFooter.getColumnInTable,
- dataFileFooter.getSegmentInfo.getColumnCardinality)
-
- // Create taskIdMapping here for range column by reading min/max values.
- if (null != rangeColumn) {
- if (null == expressionMapForRangeCol) {
- expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
+ } else {
+ carbonInputSplits.foreach { split =>
+ var dataFileFooter: DataFileFooter = null
+ if (null == rangeColumn) {
+ val taskNo = getTaskNo(split, partitionTaskMap, counter)
+ var sizeOfSplit = split.getDetailInfo.getBlockSize
+ val splitList = taskIdMapping.get(taskNo)
+ noOfBlocks += 1
+ if (null == splitList) {
+ val splitTempList = new util.ArrayList[CarbonInputSplit]()
+ splitTempList.add(split)
+ taskIdMapping.put(taskNo, splitTempList)
+ } else {
+ splitList.add(split)
+ }
+ }
+ // Check the cardinality of each columns and set the highest.
+ try {
+ dataFileFooter = CarbonUtil.readMetadataFile(
+ CarbonInputSplit.getTableBlockInfo(split))
+ } catch {
+ case e: IOException =>
+ logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+ throw e
}
- if (-1 == indexOfRangeColumn) {
- val allColumns = dataFileFooter.getColumnInTable
- for (i <- 0 until allColumns.size()) {
- if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
- indexOfRangeColumn = i
+ // add all the column and cardinality to the map
+ CarbonCompactionUtil
+ .addColumnCardinalityToMap(columnToCardinalityMap,
+ dataFileFooter.getColumnInTable,
+ dataFileFooter.getSegmentInfo.getColumnCardinality)
+
+ // Create taskIdMapping here for range column by reading min/max values.
+ if (null != rangeColumn) {
+ if (null == expressionMapForRangeCol) {
+ expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
+ }
+ if (-1 == indexOfRangeColumn) {
+ val allColumns = dataFileFooter.getColumnInTable
+ for (i <- 0 until allColumns.size()) {
+ if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
+ indexOfRangeColumn = i
+ }
}
}
- }
- // Create ranges and add splits to the tasks
- for (i <- 0 until (newRanges.size + 1)) {
- if (null == expressionMapForRangeCol.get(i)) {
- // Creating FilterExpression for the range column
- var minVal: Object = null
- var maxVal: Object = null
- // For first task we will create an Or Filter and also accomodate null values
- // For last task we will take as GreaterThan Expression of last value
- if (i != 0) {
- minVal = newRanges(i - 1)
+ // Create ranges and add splits to the tasks
+ for (i <- 0 until (newRanges.size + 1)) {
+ if (null == expressionMapForRangeCol.get(i)) {
+ // Creating FilterExpression for the range column
+ var minVal: Object = null
+ var maxVal: Object = null
+ // For first task we will create an Or Filter and also accomodate null values
+ // For last task we will take as GreaterThan Expression of last value
+ if (i != 0) {
+ minVal = newRanges(i - 1)
+ }
+ if (i != newRanges.size) {
+ maxVal = newRanges(i)
+ }
+ val filterExpr = CarbonCompactionUtil
+ .getFilterExpressionForRange(rangeColumn,
+ minVal, maxVal, dataType)
+ expressionMapForRangeCol.put(i, filterExpr)
}
- if (i != newRanges.size) {
- maxVal = newRanges(i)
+ var splitList = taskIdMapping.get(i.toString)
+ noOfBlocks += 1
+ if (null == splitList) {
+ splitList = new util.ArrayList[CarbonInputSplit]()
+ taskIdMapping.put(i.toString, splitList)
}
- val filterExpr = CarbonCompactionUtil
- .getFilterExpressionForRange(rangeColumn,
- minVal, maxVal, dataType)
- expressionMapForRangeCol.put(i, filterExpr)
- }
- var splitList = taskIdMapping.get(i.toString)
- noOfBlocks += 1
- if (null == splitList) {
- splitList = new util.ArrayList[CarbonInputSplit]()
- taskIdMapping.put(i.toString, splitList)
+ splitList.add(split)
}
- splitList.add(split)
}
}
}