You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 18:00:00 UTC

[33/50] [abbrv] carbondata git commit: [CARBONDATA-3031] refining usage of numberofcores in CarbonProperties

[CARBONDATA-3031] refining usage of numberofcores in CarbonProperties

1. many places use the function 'getNumOfCores' of CarbonProperties which returns the loading cores.
2. so if we still use the value in scene like 'query' or 'compaction' , it will be confused.

This closes #2907


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/518e2b6d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/518e2b6d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/518e2b6d

Branch: refs/heads/branch-1.5
Commit: 518e2b6da468e2cd377c1f931f91f7b96ca0fbe0
Parents: 22f8722
Author: Sssan520 <li...@aliyun.com>
Authored: Mon Jul 2 19:12:24 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 21 22:43:46 2018 +0530

----------------------------------------------------------------------
 .../dictionary/AbstractDictionaryCache.java     |  2 +-
 .../generator/TableDictionaryGenerator.java     |  2 +-
 .../reader/CarbonDeleteFilesDataReader.java     |  6 +++-
 .../carbondata/core/util/CarbonProperties.java  | 35 +++++++++++++++-----
 .../CarbonAlterTableDropPartitionCommand.scala  |  4 +--
 .../CarbonAlterTableSplitPartitionCommand.scala |  6 ++--
 .../loading/CarbonDataLoadConfiguration.java    | 10 ++++++
 .../loading/DataLoadProcessBuilder.java         |  2 ++
 .../processing/merger/CarbonDataMergerUtil.java |  3 +-
 .../merger/CompactionResultSortProcessor.java   |  4 ++-
 .../sort/sortdata/SortParameters.java           |  8 +++--
 .../store/CarbonFactDataHandlerModel.java       | 13 ++------
 .../util/CarbonDataProcessorUtil.java           |  2 +-
 13 files changed, 62 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 83c7237..36d5f98 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -70,7 +70,7 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
    * max number of threads for a job
    */
   private void initThreadPoolSize() {
-    thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
+    thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index 33a91d8..003ab5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -78,7 +78,7 @@ public class TableDictionaryGenerator
   }
 
   @Override public void writeDictionaryData() {
-    int numOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    int numOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
     long start = System.currentTimeMillis();
     ExecutorService executorService = Executors.newFixedThreadPool(numOfCores);
     for (final DictionaryGenerator generator : columnMap.values()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index 32eb60d..ee87a75 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -60,12 +60,16 @@ public class CarbonDeleteFilesDataReader {
     initThreadPoolSize();
   }
 
+  public CarbonDeleteFilesDataReader(int thread_pool_size) {
+    this.thread_pool_size = thread_pool_size;
+  }
+
   /**
    * This method will initialize the thread pool size to be used for creating the
    * max number of threads for a job
    */
   private void initThreadPoolSize() {
-    thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
+    thread_pool_size = CarbonProperties.getInstance().getNumberOfLoadingCores();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index d2f812e..f4a75a8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -949,20 +949,15 @@ public final class CarbonProperties {
     return compactionSize;
   }
 
-  /**
-   * Number of cores should be used while loading data.
-   *
-   * @return
-   */
-  public int getNumberOfCores() {
+  private int getNumberOfCores(String key) {
     int numberOfCores;
     try {
       numberOfCores = Integer.parseInt(
           CarbonProperties.getInstance().getProperty(
-              CarbonCommonConstants.NUM_CORES_LOADING,
+              key,
               CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
     } catch (NumberFormatException exc) {
-      LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+      LOGGER.warn("Configured value for property " + key
           + " is wrong. Falling back to the default value "
           + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
@@ -971,6 +966,30 @@ public final class CarbonProperties {
   }
 
   /**
+   * Number of cores should be used while loading data.
+   * @return the number of cores to be used while loading data
+   */
+  public int getNumberOfLoadingCores() {
+    return getNumberOfCores(CarbonCommonConstants.NUM_CORES_LOADING);
+  }
+
+  /**
+   * Number of cores to be used while compacting.
+   * @return the number of cores to be used while compacting
+   */
+  public int getNumberOfCompactingCores() {
+    return getNumberOfCores(CarbonCommonConstants.NUM_CORES_COMPACTING);
+  }
+
+  /**
+   * Number of cores to be used while alter partition.
+   * @return the number of cores to be used while alter partition
+   */
+  public int getNumberOfAltPartitionCores() {
+    return getNumberOfCores(CarbonCommonConstants.NUM_CORES_ALT_PARTITION);
+  }
+
+  /**
    * Get the sort chunk memory size
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index d4a2d7f..832cb00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -197,9 +197,7 @@ case class CarbonAlterTableDropPartitionCommand(
       partitionId: String,
       dropWithData: Boolean,
       oldPartitionIds: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores
     val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 18c47a9..f17cdd6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -215,10 +215,8 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel: CarbonLoadModel,
       partitionId: String,
       oldPartitionIdList: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
-    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    val numberOfCores = CarbonProperties.getInstance().getNumberOfAltPartitionCores
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores)
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 616edeb..d3501c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -127,6 +127,8 @@ public class CarbonDataLoadConfiguration {
    */
   private String columnCompressor;
 
+  private int numberOfLoadingCores;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -460,4 +462,12 @@ public class CarbonDataLoadConfiguration {
   public void setColumnCompressor(String columnCompressor) {
     this.columnCompressor = columnCompressor;
   }
+
+  public int getNumberOfLoadingCores() {
+    return numberOfLoadingCores;
+  }
+
+  public void setNumberOfLoadingCores(int numberOfLoadingCores) {
+    this.numberOfLoadingCores = numberOfLoadingCores;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 4926cd8..89d09fe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -312,6 +312,8 @@ public final class DataLoadProcessBuilder {
     if (loadModel.getSdkWriterCores() > 0) {
       configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
     }
+    configuration.setNumberOfLoadingCores(CarbonProperties.getInstance().getNumberOfLoadingCores());
+
     configuration.setColumnCompressor(loadModel.getColumnCompressor());
     return configuration;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index fc139a6..a9a6085 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1265,7 +1265,8 @@ public final class CarbonDataMergerUtil {
       String blockName, String fullBlockFilePath) throws IOException {
 
     DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
-    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
+    int numberOfcores = CarbonProperties.getInstance().getNumberOfCompactingCores();
+    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(numberOfcores);
     try {
       deleteDeltaBlockDetails =
               dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 7c7b8ee..d14f626 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -450,11 +451,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * @return
    */
   private SortParameters createSortParameters() {
+    int numberOfCompactingCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
     return SortParameters
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
             noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
-            sortColumnMapping, isVarcharDimMapping, true);
+            sortColumnMapping, isVarcharDimMapping, true, numberOfCompactingCores / 2);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 7908f4f..200c5f4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -442,10 +442,12 @@ public class SortParameters implements Serializable {
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
 
-    int numberOfCores = carbonProperties.getNumberOfCores() / 2;
+    int numberOfCores = 1;
     // In case of loading from partition we should use the cores specified by it
     if (configuration.getWritingCoresCount() > 0) {
       numberOfCores = configuration.getWritingCoresCount();
+    } else {
+      numberOfCores = configuration.getNumberOfLoadingCores() / 2;
     }
     parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
@@ -486,7 +488,8 @@ public class SortParameters implements Serializable {
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
       int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping,
-      boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) {
+      boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow,
+      int numberOfCores) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setCarbonTable(carbonTable);
@@ -526,7 +529,6 @@ public class SortParameters implements Serializable {
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
 
-    int numberOfCores = carbonProperties.getNumberOfCores() / 2;
     parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 92c48bc..f70e749 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -688,18 +688,9 @@ public class CarbonFactDataHandlerModel {
     // in compaction flow the measure with decimal type will come as spark decimal.
     // need to convert it to byte array.
     if (this.isCompactionFlow()) {
-      try {
-        this.numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
-                CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      } catch (NumberFormatException exc) {
-        LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_COMPACTING
-            + "is wrong.Falling back to the default value "
-            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-        this.numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-      }
+      this.numberOfCores = CarbonProperties.getInstance().getNumberOfCompactingCores();
     } else {
-      this.numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+      this.numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
     }
 
     if (this.sortScope != null && this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/518e2b6d/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 437f628..1d1f451 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -686,7 +686,7 @@ public final class CarbonDataProcessorUtil {
     if (sdkWriterCores > 0) {
       numberOfCores = sdkWriterCores;
     } else {
-      numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+      numberOfCores = CarbonProperties.getInstance().getNumberOfLoadingCores();
     }
     // Get the minimum of number of cores and iterators size to get the number of parallel threads
     // to be launched.