You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:01:34 UTC

[15/51] [abbrv] carbondata git commit: [CARBONDATA-1462]Add an option 'carbon.update.storage.level' to support configuring the storage level when updating data with 'carbon.update.persist.enable'='true'

[CARBONDATA-1462]Add an option 'carbon.update.storage.level' to support configuring the storage level when updating data with 'carbon.update.persist.enable'='true'

When updating data with 'carbon.update.persist.enable'='true'(default), the storage level of dataset is 'MEMORY_AND_DISK', it should support configuring the storage level to correspond to different environment.

This closes #1340


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0ab928e9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0ab928e9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0ab928e9

Branch: refs/heads/branch-1.2
Commit: 0ab928e9c1730d69a3fcd1805c26ef1200214fc9
Parents: 8b38e0b
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Sep 8 13:27:42 2017 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Sep 11 20:33:57 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 20 +++++++++++
 .../carbondata/core/util/CarbonProperties.java  | 36 ++++++++++++++++++++
 .../sql/execution/command/IUDCommands.scala     | 17 ++-------
 3 files changed, 59 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6c116a7..5a68f60 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1091,6 +1091,26 @@ public final class CarbonCommonConstants {
   public static final String defaultValueIsPersistEnabled = "true";
 
   /**
+   * Which storage level to persist dataset when updating data
+   * with 'carbon.update.persist.enable'='true'
+   */
+  @CarbonProperty
+  public static final String CARBON_UPDATE_STORAGE_LEVEL =
+      "carbon.update.storage.level";
+
+  /**
+   * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset.
+   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
+   * recomputing the in-memory columnar representation of the underlying table is expensive.
+   *
+   * if user's executor has less memory, set the CARBON_UPDATE_STORAGE_LEVEL
+   * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment.
+   * You can get more recommendations about storage level in spark website:
+   * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+   */
+  public static final String CARBON_UPDATE_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK";
+
+  /**
    * current data file version
    */
   public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 4e9c21a..0ab28e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -890,6 +890,42 @@ public final class CarbonProperties {
   }
 
   /**
+   * Return valid CARBON_UPDATE_STORAGE_LEVEL
+   * @return boolean
+   */
+  public boolean isPersistUpdateDataset() {
+    String isPersistEnabled = getProperty(CarbonCommonConstants.isPersistEnabled,
+            CarbonCommonConstants.defaultValueIsPersistEnabled);
+    boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled);
+    if (!validatePersistEnabled) {
+      LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled
+          + " configuration value is invalid. It will use default value("
+          + CarbonCommonConstants.defaultValueIsPersistEnabled
+          + ").");
+      isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled;
+    }
+    return isPersistEnabled.equalsIgnoreCase("true");
+  }
+
+  /**
+   * Return valid storage level for CARBON_UPDATE_STORAGE_LEVEL
+   * @return String
+   */
+  public String getUpdateDatasetStorageLevel() {
+    String storageLevel = getProperty(CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL,
+        CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT);
+    boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
+    if (!validateStorageLevel) {
+      LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL
+          + " configuration value is invalid. It will use default storage level("
+          + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT
+          + ") to persist dataset.");
+      storageLevel = CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT;
+    }
+    return storageLevel.toUpperCase();
+  }
+
+  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0ab928e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index d3a80d4..5820b9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -175,16 +175,7 @@ private[sql] case class ProjectForUpdateCommand(
     val currentTime = CarbonUpdateUtil.readCurrentTime
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-        .getProperty(CarbonCommonConstants.isPersistEnabled,
-          CarbonCommonConstants.defaultValueIsPersistEnabled)
-    var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
-    if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
-      isPersistEnabled = false
-    }
-    else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
-      isPersistEnabled = true
-    }
+    var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -199,13 +190,11 @@ private[sql] case class ProjectForUpdateCommand(
       // Get RDD.
 
       dataSet = if (isPersistEnabled) {
-        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.MEMORY_AND_DISK)
-        //          DataFrame(sqlContext, plan)
-        //            .persist(StorageLevel.MEMORY_AND_DISK)
+        Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
+          CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
       }
       else {
         Dataset.ofRows(sparkSession, plan)
-        //          DataFrame(sqlContext, plan)
       }
       var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")