You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:10:25 UTC

[carbondata] branch master updated: [CARBONDATA-3432] Added property for enable/disable Range column compaction and broadcast all splits

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cf0043c  [CARBONDATA-3432] Added property for enable/disable Range column compaction and broadcast all splits
cf0043c is described below

commit cf0043c870cda89834e90459f31e3d83d640f875
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Thu Jun 13 10:11:41 2019 +0530

    [CARBONDATA-3432] Added property for enable/disable Range column compaction and broadcast all splits
    
    Added property for enable/disable Range column compaction for which default value is true.
    Instead of sending all splits to all executors one by one, broadcast all splits from driver.
    
    This closes #3284
---
 .../core/constants/CarbonCommonConstants.java      |  4 ++
 .../carbondata/core/util/CarbonProperties.java     |  6 +++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 53 ++++++++++++++++------
 3 files changed, 50 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6833c8a..6888f8e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1205,6 +1205,10 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
 
+  public static final String CARBON_ENABLE_RANGE_COMPACTION = "carbon.enable.range.compaction";
+
+  public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "true";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Query parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index ec8a1c9..5868664 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1560,6 +1560,12 @@ public final class CarbonProperties {
     return Boolean.parseBoolean(pushFilters);
   }
 
+  public boolean isRangeCompactionAllowed() {
+    String isRangeCompact = getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
+        CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
+    return Boolean.parseBoolean(isRangeCompact);
+  }
+
   private void validateSortMemorySpillPercentage() {
     String spillPercentageStr = carbonProperties.getProperty(
         CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 8935b5b..febaeca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -29,6 +29,7 @@ import scala.reflect.classTag
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{InputSplit, Job}
 import org.apache.spark._
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -85,14 +86,20 @@ class CarbonMergerRDD[K, V](
   val databaseName = carbonMergerMapping.databaseName
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
+  var rangeColumn: CarbonColumn = null
+  var singleRange = false
   var expressionMapForRangeCol: util.Map[Integer, Expression] = null
+  var broadCastSplits: Broadcast[util.List[CarbonInputSplit]] = null
+
+  def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = {
+    broadCastSplits = sparkContext.broadcast(splits)
+  }
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val queryStartTime = System.currentTimeMillis()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val rangeColumn = carbonTable.getRangeColumn
       val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
       if (carbonTable.isPartitionTable) {
         carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
@@ -112,7 +119,14 @@ class CarbonMergerRDD[K, V](
       var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
       try {
         // sorting the table block info List.
-        val splitList = carbonSparkPartition.split.value.getAllSplits
+        val splitList = if (null == rangeColumn || singleRange) {
+          // In case of non-range column or single value inside the range column we do not use
+          // the broadcast splits, only for range column we use the broadcast splits(which have
+          // all the splits)
+          carbonSparkPartition.split.value.getAllSplits
+        } else {
+          broadCastSplits.value
+        }
         val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
 
         Collections.sort(tableBlockInfoList)
@@ -296,8 +310,8 @@ class CarbonMergerRDD[K, V](
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    var rangeColumn: CarbonColumn = null
-    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+    if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
+        !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
       // If the table is not a partition table then only we go for range column compaction flow
       rangeColumn = carbonTable.getRangeColumn
     }
@@ -396,7 +410,6 @@ class CarbonMergerRDD[K, V](
     totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)
-    var singleRange = false
     if (rangeColumn != null) {
       // Calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
       // We take the minimum of average number of tasks created during load time and the number
@@ -557,6 +570,11 @@ class CarbonMergerRDD[K, V](
           }
         }
       }
+      if (null != rangeColumn) {
+        // Broadcast all splits to all the executors only in case of range column
+        // having more than 1 unique value.
+        makeBroadCast(carbonInputSplits.asJava)
+      }
     }
     val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
     // update cardinality and column schema list according to master schema
@@ -599,9 +617,16 @@ class CarbonMergerRDD[K, V](
 
         if (blockletCount != 0) {
           val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
-          val multiBlockSplit = new CarbonMultiBlockSplit(
-            taskInfo.getCarbonInputSplitList,
-            Array(nodeName))
+          val multiBlockSplit = if (null == rangeColumn || singleRange) {
+            new CarbonMultiBlockSplit(
+              taskInfo.getCarbonInputSplitList,
+              Array(nodeName))
+          } else {
+            var splitListForRange = new util.ArrayList[CarbonInputSplit]()
+            new CarbonMultiBlockSplit(
+              splitListForRange,
+              Array(nodeName))
+          }
           if (isPartitionTable) {
             carbonPartitionId = Integer.parseInt(taskInfo.getTaskId)
           }
@@ -630,11 +655,13 @@ class CarbonMergerRDD[K, V](
     logInfo(s"Identified  no.of.Blocks: $noOfBlocks," +
             s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
     logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
-    for (j <- 0 until result.size) {
-      val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
-      val splitList = multiBlockSplit.getAllSplits
-      logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
-              s"${ CarbonInputSplit.createBlocks(splitList).size }")
+    if (rangeColumn == null) {
+      for (j <- 0 until result.size) {
+        val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+        val splitList = multiBlockSplit.getAllSplits
+        logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
+                s"${ CarbonInputSplit.createBlocks(splitList).size }")
+      }
     }
     result.toArray(new Array[Partition](result.size))
   }