You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/09 14:45:17 UTC

carbondata git commit: [CARBONDATA-1624]Set the default value of 'carbon.number.of.cores.while.loading' as per the spark conf 'spark.executor.cores'

Repository: carbondata
Updated Branches:
  refs/heads/master f3b507cb7 -> 9e9d68988


[CARBONDATA-1624]Set the default value of 'carbon.number.of.cores.while.loading' as per the spark conf 'spark.executor.cores'

1.Use 'spark.executor.cores' as the default value for 'carbon.number.of.cores.while.loading'
2.Use 'CarbonProperties.getNumberOfCores()' to get 'carbon.number.of.cores.while.loading' uniformly

This closes #1455


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e9d6898
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e9d6898
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e9d6898

Branch: refs/heads/master
Commit: 9e9d68988e29a9c3a2520189d822835562f4a34d
Parents: f3b507c
Author: Zhang Zhichao <44...@qq.com>
Authored: Tue Oct 31 10:55:38 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Nov 9 22:44:38 2017 +0800

----------------------------------------------------------------------
 .../dictionary/AbstractDictionaryCache.java     |  8 +-------
 .../generator/TableDictionaryGenerator.java     | 10 +---------
 .../reader/CarbonDeleteFilesDataReader.java     |  8 +-------
 .../carbondata/core/util/CarbonProperties.java  |  6 ++++--
 .../testsuite/datamap/DataMapWriterSuite.scala  |  7 ++++---
 .../command/management/LoadTableCommand.scala   | 20 ++++++++++++++++++++
 .../sort/sortdata/SortParameters.java           | 20 ++------------------
 .../store/CarbonFactDataHandlerColumnar.java    | 11 +----------
 8 files changed, 34 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 4046364..e145cb8 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -72,13 +72,7 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
    * max number of threads for a job
    */
   private void initThreadPoolSize() {
-    try {
-      thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-    } catch (NumberFormatException e) {
-      thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
+    thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index f08ba1f..905d2fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.devapi.DictionaryGenerator;
@@ -72,14 +71,7 @@ public class TableDictionaryGenerator
   }
 
   @Override public void writeDictionaryData() {
-    int numOfCores = 1;
-    try {
-      numOfCores = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-                      CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-    } catch (NumberFormatException e) {
-      numOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
+    int numOfCores = CarbonProperties.getInstance().getNumberOfCores();
     long start = System.currentTimeMillis();
     ExecutorService executorService = Executors.newFixedThreadPool(numOfCores);
     for (final DictionaryGenerator generator : columnMap.values()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index 6739b41..cc6e53f 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -65,13 +65,7 @@ public class CarbonDeleteFilesDataReader {
    * max number of threads for a job
    */
   private void initThreadPoolSize() {
-    try {
-      thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-    } catch (NumberFormatException e) {
-      thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
+    thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index cdd6183..678a6f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -669,9 +669,11 @@ public final class CarbonProperties {
     int numberOfCores;
     try {
       numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING));
     } catch (NumberFormatException exc) {
+      LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+          + " is wrong. Falling back to the default value "
+          + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
     }
     return numberOfCores;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index ff900ce..888c97d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -18,13 +18,11 @@
 package org.apache.carbondata.spark.testsuite.datamap
 
 import java.util
-
 import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
@@ -117,6 +115,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
 
     CarbonProperties.getInstance()
       .addProperty("carbon.blockletgroup.size.in.mb", "1")
+    CarbonProperties.getInstance()
+      .addProperty("carbon.number.of.cores.while.loading",
+          CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
 
     val df = buildTestData(300000)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 630ee27..bda6829 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -84,6 +84,26 @@ case class LoadTableCommand(
 
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
+
+    // get the value of 'spark.executor.cores' from spark conf, default value is 1
+    val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
+    // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
+    // default value is the value of 'spark.executor.cores'
+    val numCoresLoading =
+      try {
+        CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores)
+      } catch {
+        case exc: NumberFormatException =>
+          LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+              + " is wrong. Falling back to the default value "
+              + sparkExecutorCores)
+          sparkExecutorCores
+      }
+
+    // update the property with new value
+    carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
+
     val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
 
     val tableProperties = relation.tableMeta.carbonTable.getTableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 39e1049..4da4c84 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -418,15 +418,7 @@ public class SortParameters implements Serializable {
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
 
-    int numberOfCores;
-    try {
-      numberOfCores = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      numberOfCores = numberOfCores / 2;
-    } catch (NumberFormatException exc) {
-      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
+    int numberOfCores = carbonProperties.getNumberOfCores() / 2;
     parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
@@ -539,15 +531,7 @@ public class SortParameters implements Serializable {
     parameters.setTempFileLocation(sortTempDirs);
     LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
 
-    int numberOfCores;
-    try {
-      numberOfCores = Integer.parseInt(carbonProperties
-          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      numberOfCores = numberOfCores / 2;
-    } catch (NumberFormatException exc) {
-      numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-    }
+    int numberOfCores = carbonProperties.getNumberOfCores() / 2;
     parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7882cd4..504e7ec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -223,16 +223,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       }
     } else {
-      try {
-        numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
-                CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
-      } catch (NumberFormatException exc) {
-        LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
-            + "is wrong.Falling back to the default value "
-            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-        numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
-      }
+      numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
     }
 
     if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {