You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/29 06:40:55 UTC

[carbondata] branch master updated: [CARBONDATA-3396] Range Compaction Data Mismatch Fix

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ce40c64  [CARBONDATA-3396] Range Compaction Data Mismatch Fix
ce40c64 is described below

commit ce40c64f552d02417400111e9865ff77a05d4fbd
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon May 27 11:41:10 2019 +0530

    [CARBONDATA-3396] Range Compaction Data Mismatch Fix
    
    Problem : When we have to compact the data second time and the ranges made first time have data in more than one file/blocklet, then while compacting second time if the first blocklet does not contain any record then the other files are also skipped. Also, Global Sort and Local Sort with Range Column were taking different time for same data load and compaction as during write step we give only 1 core to Global Sort.
    
    Solution : For the first issue we are reading all the blocklets of a given range and then breaking only when the batch size is full. For the second issue in case of range column both the sort scopes will now take same number of cores and behave similarly.
    
    Also changed the number of tasks to be launched during the compaction, now based on the number of tasks during load.
    
    This closes #3233
---
 .../core/constants/CarbonCommonConstants.java      |  4 ----
 .../AbstractDetailQueryResultIterator.java         | 14 +------------
 .../scan/result/iterator/RawResultIterator.java    | 11 +++++++++--
 .../carbondata/core/util/CarbonProperties.java     | 23 ++++++++++++++++------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 18 ++++++++++++-----
 .../processing/merger/CarbonCompactionUtil.java    | 11 +++++++++++
 .../store/CarbonFactDataHandlerModel.java          |  3 ++-
 7 files changed, 53 insertions(+), 31 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e78ea17..aa9dd05 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1193,10 +1193,6 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
 
-  public static final String CARBON_ENABLE_RANGE_COMPACTION = "carbon.enable.range.compaction";
-
-  public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "false";
-
   //////////////////////////////////////////////////////////////////////////////////////////
   // Query parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index f39e549..d7f2c0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -89,18 +88,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
 
   AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
       ExecutorService execService) {
-    String batchSizeString =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
-    if (null != batchSizeString) {
-      try {
-        batchSize = Integer.parseInt(batchSizeString);
-      } catch (NumberFormatException ne) {
-        LOGGER.error("Invalid inmemory records size. Using default value");
-        batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-      }
-    } else {
-      batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-    }
+    batchSize = CarbonProperties.getQueryBatchSize();
     this.recorder = queryModel.getStatisticsRecorder();
     this.blockExecutionInfos = infos;
     this.fileReader = FileFactory.getFileHolder(
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 4d471b6..911a7dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -59,6 +59,10 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
   private boolean isBackupFilled = false;
 
   /**
+   * number of cores which can be used
+   */
+  private int batchSize;
+  /**
    * LOGGER
    */
   private static final Logger LOGGER =
@@ -71,7 +75,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
     this.executorService = Executors.newFixedThreadPool(1);
-
+    batchSize = CarbonProperties.getQueryBatchSize();
     if (init) {
       init();
     }
@@ -116,10 +120,13 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
 
   private List<Object[]> fetchRows() throws Exception {
     List<Object[]> converted = new ArrayList<>();
-    if (detailRawQueryResultIterator.hasNext()) {
+    while (detailRawQueryResultIterator.hasNext()) {
       for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
         converted.add(convertRow(r));
       }
+      if (converted.size() >= batchSize) {
+        break;
+      }
     }
     return converted;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f1aade9..a53c365 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1056,6 +1056,23 @@ public final class CarbonProperties {
     return batchSize;
   }
 
+  public static int getQueryBatchSize() {
+    int batchSize;
+    String batchSizeString =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
+    if (null != batchSizeString) {
+      try {
+        batchSize = Integer.parseInt(batchSizeString);
+      } catch (NumberFormatException ne) {
+        LOGGER.error("Invalid inmemory records size. Using default value");
+        batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+      }
+    } else {
+      batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+    }
+    return batchSize;
+  }
+
   public long getHandoffSize() {
     Long handoffSize;
     try {
@@ -1507,12 +1524,6 @@ public final class CarbonProperties {
     return Boolean.parseBoolean(pushFilters);
   }
 
-  public boolean isRangeCompactionAllowed() {
-    String isRangeCompact = getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
-        CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
-    return Boolean.parseBoolean(isRangeCompact);
-  }
-
   private void validateSortMemorySpillPercentage() {
     String spillPercentageStr = carbonProperties.getProperty(
         CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4f4386b..656166d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -297,8 +297,7 @@ class CarbonMergerRDD[K, V](
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var rangeColumn: CarbonColumn = null
-    if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
-        !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
       // If the table is not a partition table then only we go for range column compaction flow
       rangeColumn = carbonTable.getRangeColumn
     }
@@ -339,6 +338,7 @@ class CarbonMergerRDD[K, V](
         java.util.HashMap[String, java.util.List[CarbonInputSplit]]
 
     var totalSize: Double = 0
+    var totalTaskCount: Integer = 0
     var loadMetadataDetails: Array[LoadMetadataDetails] = null
     // Only for range column get the details for the size of segments
     if (null != rangeColumn) {
@@ -386,17 +386,25 @@ class CarbonMergerRDD[K, V](
             updateDetails, updateStatusManager)))) &&
         FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
       }
+      if (rangeColumn != null) {
+        totalTaskCount = totalTaskCount +
+                         CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
+      }
       carbonInputSplits ++:= filteredSplits
       allSplits.addAll(filteredSplits.asJava)
     }
+    totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)
     var singleRange = false
     if (rangeColumn != null) {
-      // To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
+      // Calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
+      // We take the minimum of average number of tasks created during load time and the number
+      // of tasks we get based on size for creating ranges.
       val numOfPartitions = Math
-        .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
-          .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, true))
+        .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt,
+          Math.min(totalTaskCount, DataLoadProcessBuilderOnSpark
+            .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, true)))
       val colName = rangeColumn.getColName
       LOGGER.info(s"Compacting on range column: $colName")
       allRanges = getRangesFromRDD(rangeColumn,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index f4a15bb..c3017a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -19,8 +19,10 @@ package org.apache.carbondata.processing.merger;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -601,6 +603,15 @@ public class CarbonCompactionUtil {
     return minMaxVals;
   }
 
+  public static int getTaskCountForSegment(CarbonInputSplit[] splits) {
+    Set<String> taskIdSet = new HashSet<>();
+    for (CarbonInputSplit split : splits) {
+      String taskId = split.taskId;
+      taskIdSet.add(taskId);
+    }
+    return taskIdSet.size();
+  }
+
   /**
    * Returns if the DataFileFooter containing carbondata file contains
    * sorted data or not.
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index e66e233..8aaeb9d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -741,7 +741,8 @@ public class CarbonFactDataHandlerModel {
       this.numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
     }
 
-    if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+    if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)
+        && tableSpec.getCarbonTable().getRangeColumn() != null) {
       this.numberOfCores = 1;
     }
     // Overriding it to the task specified cores.