You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/09 14:45:17 UTC
carbondata git commit: [CARBONDATA-1624]Set the default value of
'carbon.number.of.cores.while.loading' as per the spark conf
'spark.executor.cores'
Repository: carbondata
Updated Branches:
refs/heads/master f3b507cb7 -> 9e9d68988
[CARBONDATA-1624]Set the default value of 'carbon.number.of.cores.while.loading' as per the spark conf 'spark.executor.cores'
1.Use 'spark.executor.cores' as the default value for 'carbon.number.of.cores.while.loading'
2.Use 'CarbonProperties.getNumberOfCores()' to get 'carbon.number.of.cores.while.loading' uniformly
This closes #1455
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e9d6898
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e9d6898
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e9d6898
Branch: refs/heads/master
Commit: 9e9d68988e29a9c3a2520189d822835562f4a34d
Parents: f3b507c
Author: Zhang Zhichao <44...@qq.com>
Authored: Tue Oct 31 10:55:38 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Nov 9 22:44:38 2017 +0800
----------------------------------------------------------------------
.../dictionary/AbstractDictionaryCache.java | 8 +-------
.../generator/TableDictionaryGenerator.java | 10 +---------
.../reader/CarbonDeleteFilesDataReader.java | 8 +-------
.../carbondata/core/util/CarbonProperties.java | 6 ++++--
.../testsuite/datamap/DataMapWriterSuite.scala | 7 ++++---
.../command/management/LoadTableCommand.scala | 20 ++++++++++++++++++++
.../sort/sortdata/SortParameters.java | 20 ++------------------
.../store/CarbonFactDataHandlerColumnar.java | 11 +----------
8 files changed, 34 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index 4046364..e145cb8 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -72,13 +72,7 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
* max number of threads for a job
*/
private void initThreadPoolSize() {
- try {
- thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- } catch (NumberFormatException e) {
- thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index f08ba1f..905d2fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.devapi.DictionaryGenerator;
@@ -72,14 +71,7 @@ public class TableDictionaryGenerator
}
@Override public void writeDictionaryData() {
- int numOfCores = 1;
- try {
- numOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- } catch (NumberFormatException e) {
- numOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ int numOfCores = CarbonProperties.getInstance().getNumberOfCores();
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(numOfCores);
for (final DictionaryGenerator generator : columnMap.values()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
index 6739b41..cc6e53f 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -65,13 +65,7 @@ public class CarbonDeleteFilesDataReader {
* max number of threads for a job
*/
private void initThreadPoolSize() {
- try {
- thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- } catch (NumberFormatException e) {
- thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ thread_pool_size = CarbonProperties.getInstance().getNumberOfCores();
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index cdd6183..678a6f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -669,9 +669,11 @@ public final class CarbonProperties {
int numberOfCores;
try {
numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+ .getProperty(CarbonCommonConstants.NUM_CORES_LOADING));
} catch (NumberFormatException exc) {
+ LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ + " is wrong. Falling back to the default value "
+ + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
return numberOfCores;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index ff900ce..888c97d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -18,13 +18,11 @@
package org.apache.carbondata.spark.testsuite.datamap
import java.util
-
import scala.collection.JavaConverters._
-
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
@@ -117,6 +115,9 @@ class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty("carbon.blockletgroup.size.in.mb", "1")
+ CarbonProperties.getInstance()
+ .addProperty("carbon.number.of.cores.while.loading",
+ CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
val df = buildTestData(300000)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 630ee27..bda6829 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -84,6 +84,26 @@ case class LoadTableCommand(
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
+
+ // get the value of 'spark.executor.cores' from spark conf, default value is 1
+ val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
+ // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
+ // default value is the value of 'spark.executor.cores'
+ val numCoresLoading =
+ try {
+ CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores)
+ } catch {
+ case exc: NumberFormatException =>
+ LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ + " is wrong. Falling back to the default value "
+ + sparkExecutorCores)
+ sparkExecutorCores
+ }
+
+ // update the property with new value
+ carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
+
val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
val tableProperties = relation.tableMeta.carbonTable.getTableInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 39e1049..4da4c84 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -418,15 +418,7 @@ public class SortParameters implements Serializable {
parameters.setTempFileLocation(sortTempDirs);
LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
- int numberOfCores;
- try {
- numberOfCores = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- numberOfCores = numberOfCores / 2;
- } catch (NumberFormatException exc) {
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ int numberOfCores = carbonProperties.getNumberOfCores() / 2;
parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
@@ -539,15 +531,7 @@ public class SortParameters implements Serializable {
parameters.setTempFileLocation(sortTempDirs);
LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ","));
- int numberOfCores;
- try {
- numberOfCores = Integer.parseInt(carbonProperties
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- numberOfCores = numberOfCores / 2;
- } catch (NumberFormatException exc) {
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ int numberOfCores = carbonProperties.getNumberOfCores() / 2;
parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e9d6898/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 7882cd4..504e7ec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -223,16 +223,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
} else {
- try {
- numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
- CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
- } catch (NumberFormatException exc) {
- LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
- + "is wrong.Falling back to the default value "
- + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
- }
+ numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
}
if (sortScope != null && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {