You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:34 UTC
[15/51] [abbrv] carbondata git commit: [CARBONDATA-1462]Add an option
'carbon.update.storage.level' to support configuring the storage level when
updating data with 'carbon.update.persist.enable'='true'
[CARBONDATA-1462]Add an option 'carbon.update.storage.level' to support configuring the storage level when updating data with 'carbon.update.persist.enable'='true'
When updating data with 'carbon.update.persist.enable'='true'(default), the storage level of dataset is 'MEMORY_AND_DISK', it should support configuring the storage level to correspond to different environment.
This closes #1340
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0ab928e9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0ab928e9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0ab928e9
Branch: refs/heads/branch-1.2
Commit: 0ab928e9c1730d69a3fcd1805c26ef1200214fc9
Parents: 8b38e0b
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Sep 8 13:27:42 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Sep 11 20:33:57 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 20 +++++++++++
.../carbondata/core/util/CarbonProperties.java | 36 ++++++++++++++++++++
.../sql/execution/command/IUDCommands.scala | 17 ++-------
3 files changed, 59 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6c116a7..5a68f60 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1091,6 +1091,26 @@ public final class CarbonCommonConstants {
public static final String defaultValueIsPersistEnabled = "true";
/**
+ * Which storage level to persist dataset when updating data
+ * with 'carbon.update.persist.enable'='true'
+ */
+ @CarbonProperty
+ public static final String CARBON_UPDATE_STORAGE_LEVEL =
+ "carbon.update.storage.level";
+
+ /**
+ * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset.
+ * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
+ * recomputing the in-memory columnar representation of the underlying table is expensive.
+ *
+ * if user's executor has less memory, set the CARBON_UPDATE_STORAGE_LEVEL
+ * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment.
+ * You can get more recommendations about storage level in spark website:
+ * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+ */
+ public static final String CARBON_UPDATE_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK";
+
+ /**
* current data file version
*/
public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 4e9c21a..0ab28e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -890,6 +890,42 @@ public final class CarbonProperties {
}
/**
+ * Return valid CARBON_UPDATE_STORAGE_LEVEL
+ * @return boolean
+ */
+ public boolean isPersistUpdateDataset() {
+ String isPersistEnabled = getProperty(CarbonCommonConstants.isPersistEnabled,
+ CarbonCommonConstants.defaultValueIsPersistEnabled);
+ boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled);
+ if (!validatePersistEnabled) {
+ LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled
+ + " configuration value is invalid. It will use default value("
+ + CarbonCommonConstants.defaultValueIsPersistEnabled
+ + ").");
+ isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled;
+ }
+ return isPersistEnabled.equalsIgnoreCase("true");
+ }
+
+ /**
+ * Return valid storage level for CARBON_UPDATE_STORAGE_LEVEL
+ * @return String
+ */
+ public String getUpdateDatasetStorageLevel() {
+ String storageLevel = getProperty(CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL,
+ CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT);
+ boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
+ if (!validateStorageLevel) {
+ LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+ + " configuration value is invalid. It will use default storage level("
+ + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT
+ + ") to persist dataset.");
+ storageLevel = CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT;
+ }
+ return storageLevel.toUpperCase();
+ }
+
+ /**
* returns true if carbon property
* @param key
* @return
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index d3a80d4..5820b9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -175,16 +175,7 @@ private[sql] case class ProjectForUpdateCommand(
val currentTime = CarbonUpdateUtil.readCurrentTime
// var dataFrame: DataFrame = null
var dataSet: DataFrame = null
- val isPersistEnabledUserValue = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.isPersistEnabled,
- CarbonCommonConstants.defaultValueIsPersistEnabled)
- var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
- if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
- isPersistEnabled = false
- }
- else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
- isPersistEnabled = true
- }
+ var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
try {
lockStatus = metadataLock.lockWithRetries()
if (lockStatus) {
@@ -199,13 +190,11 @@ private[sql] case class ProjectForUpdateCommand(
// Get RDD.
dataSet = if (isPersistEnabled) {
- Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK)
- // DataFrame(sqlContext, plan)
- // .persist(StorageLevel.MEMORY_AND_DISK)
+ Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
+ CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
}
else {
Dataset.ofRows(sparkSession, plan)
- // DataFrame(sqlContext, plan)
}
var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")