You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:10:25 UTC
[carbondata] branch master updated: [CARBONDATA-3432] Added
property for enable/disable Range column compaction and broadcast all
splits
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new cf0043c [CARBONDATA-3432] Added property for enable/disable Range column compaction and broadcast all splits
cf0043c is described below
commit cf0043c870cda89834e90459f31e3d83d640f875
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Thu Jun 13 10:11:41 2019 +0530
[CARBONDATA-3432] Added property for enable/disable Range column compaction and broadcast all splits
Added property for enable/disable Range column compaction for which default value is true.
Instead of sending all splits to all executors one by one, broadcast all splits from driver.
This closes #3284
---
.../core/constants/CarbonCommonConstants.java | 4 ++
.../carbondata/core/util/CarbonProperties.java | 6 +++
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 53 ++++++++++++++++------
3 files changed, 50 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6833c8a..6888f8e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1205,6 +1205,10 @@ public final class CarbonCommonConstants {
public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
+ public static final String CARBON_ENABLE_RANGE_COMPACTION = "carbon.enable.range.compaction";
+
+ public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "true";
+
//////////////////////////////////////////////////////////////////////////////////////////
// Query parameter start here
//////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index ec8a1c9..5868664 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1560,6 +1560,12 @@ public final class CarbonProperties {
return Boolean.parseBoolean(pushFilters);
}
+ public boolean isRangeCompactionAllowed() {
+ String isRangeCompact = getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
+ CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
+ return Boolean.parseBoolean(isRangeCompact);
+ }
+
private void validateSortMemorySpillPercentage() {
String spillPercentageStr = carbonProperties.getProperty(
CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 8935b5b..febaeca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -29,6 +29,7 @@ import scala.reflect.classTag
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
import org.apache.spark._
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -85,14 +86,20 @@ class CarbonMergerRDD[K, V](
val databaseName = carbonMergerMapping.databaseName
val factTableName = carbonMergerMapping.factTableName
val tableId = carbonMergerMapping.tableId
+ var rangeColumn: CarbonColumn = null
+ var singleRange = false
var expressionMapForRangeCol: util.Map[Integer, Expression] = null
+ var broadCastSplits: Broadcast[util.List[CarbonInputSplit]] = null
+
+ def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = {
+ broadCastSplits = sparkContext.broadcast(splits)
+ }
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val rangeColumn = carbonTable.getRangeColumn
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
if (carbonTable.isPartitionTable) {
carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
@@ -112,7 +119,14 @@ class CarbonMergerRDD[K, V](
var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
try {
// sorting the table block info List.
- val splitList = carbonSparkPartition.split.value.getAllSplits
+ val splitList = if (null == rangeColumn || singleRange) {
+ // In case of non-range column or single value inside the range column we do not use
+ // the broadcast splits, only for range column we use the broadcast splits(which have
+ // all the splits)
+ carbonSparkPartition.split.value.getAllSplits
+ } else {
+ broadCastSplits.value
+ }
val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
Collections.sort(tableBlockInfoList)
@@ -296,8 +310,8 @@ class CarbonMergerRDD[K, V](
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- var rangeColumn: CarbonColumn = null
- if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+ if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
+ !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
// If the table is not a partition table then only we go for range column compaction flow
rangeColumn = carbonTable.getRangeColumn
}
@@ -396,7 +410,6 @@ class CarbonMergerRDD[K, V](
totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)
- var singleRange = false
if (rangeColumn != null) {
// Calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
// We take the minimum of average number of tasks created during load time and the number
@@ -557,6 +570,11 @@ class CarbonMergerRDD[K, V](
}
}
}
+ if (null != rangeColumn) {
+ // Broadcast all splits to all the executors only in case of range column
+ // having more than 1 unique value.
+ makeBroadCast(carbonInputSplits.asJava)
+ }
}
val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
// update cardinality and column schema list according to master schema
@@ -599,9 +617,16 @@ class CarbonMergerRDD[K, V](
if (blockletCount != 0) {
val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
- val multiBlockSplit = new CarbonMultiBlockSplit(
- taskInfo.getCarbonInputSplitList,
- Array(nodeName))
+ val multiBlockSplit = if (null == rangeColumn || singleRange) {
+ new CarbonMultiBlockSplit(
+ taskInfo.getCarbonInputSplitList,
+ Array(nodeName))
+ } else {
+ var splitListForRange = new util.ArrayList[CarbonInputSplit]()
+ new CarbonMultiBlockSplit(
+ splitListForRange,
+ Array(nodeName))
+ }
if (isPartitionTable) {
carbonPartitionId = Integer.parseInt(taskInfo.getTaskId)
}
@@ -630,11 +655,13 @@ class CarbonMergerRDD[K, V](
logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
- for (j <- 0 until result.size) {
- val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
- val splitList = multiBlockSplit.getAllSplits
- logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
- s"${ CarbonInputSplit.createBlocks(splitList).size }")
+ if (rangeColumn == null) {
+ for (j <- 0 until result.size) {
+ val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+ val splitList = multiBlockSplit.getAllSplits
+ logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
+ s"${ CarbonInputSplit.createBlocks(splitList).size }")
+ }
}
result.toArray(new Array[Partition](result.size))
}