You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:28 UTC

[27/50] [abbrv] carbondata git commit: [CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage level when insert into data with 'carbon.insert.persist.enable'='true'

[CARBONDATA-2593] Add an option 'carbon.insert.storage.level' to support configuring the storage level when insert into data with 'carbon.insert.persist.enable'='true'

When insert into data with 'carbon.insert.persist.enable'='true', the storage level of dataset is 'MEMORY_AND_DISK',
it should support configuring the storage level to correspond to different environment.

This closes #2373


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181f0ac9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181f0ac9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181f0ac9

Branch: refs/heads/carbonstore
Commit: 181f0ac9bed6ff7d83268f6c058aee943b348ddc
Parents: f0c8834
Author: Zhang Zhichao <44...@qq.com>
Authored: Thu Jun 14 14:48:47 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Jun 16 03:32:36 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 25 ++++++++++++++++++++
 .../carbondata/core/util/CarbonProperties.java  | 18 ++++++++++++++
 docs/configuration-parameters.md                |  4 ++++
 .../management/CarbonInsertIntoCommand.scala    |  5 ++--
 4 files changed, 50 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c7281dd..19ff494 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -866,6 +866,7 @@ public final class CarbonCommonConstants {
    * to run load and insert queries on source table concurrently then user can enable this flag
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_INSERT_PERSIST_ENABLED = "carbon.insert.persist.enable";
 
   /**
@@ -875,6 +876,27 @@ public final class CarbonCommonConstants {
   public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false";
 
   /**
+   * Which storage level to persist dataset when insert into data
+   * with 'carbon.insert.persist.enable'='true'
+   */
+  @CarbonProperty
+  @InterfaceStability.Evolving
+  public static final String CARBON_INSERT_STORAGE_LEVEL =
+      "carbon.insert.storage.level";
+
+  /**
+   * The default value(MEMORY_AND_DISK) is the same as the default storage level of Dataset.
+   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
+   * recomputing the in-memory columnar representation of the underlying table is expensive.
+   *
+   * if user's executor has less memory, set the CARBON_INSERT_STORAGE_LEVEL
+   * to MEMORY_AND_DISK_SER or other storage level to correspond to different environment.
+   * You can get more recommendations about storage level in spark website:
+   * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+   */
+  public static final String CARBON_INSERT_STORAGE_LEVEL_DEFAULT = "MEMORY_AND_DISK";
+
+  /**
    * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";
@@ -1094,6 +1116,7 @@ public final class CarbonCommonConstants {
    * to determine to use the rdd persist or not.
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String isPersistEnabled = "carbon.update.persist.enable";
 
   /**
@@ -1117,6 +1140,7 @@ public final class CarbonCommonConstants {
    * with 'carbon.update.persist.enable'='true'
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_UPDATE_STORAGE_LEVEL =
       "carbon.update.storage.level";
 
@@ -1354,6 +1378,7 @@ public final class CarbonCommonConstants {
    * Which storage level to persist rdd when sort_scope=global_sort
    */
   @CarbonProperty
+  @InterfaceStability.Evolving
   public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL =
       "carbon.global.sort.rdd.storage.level";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 6eb7de6..b134a7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1581,4 +1581,22 @@ public final class CarbonProperties {
       return defaultValue;
     }
   }
+
+  /**
+   * Return valid storage level for CARBON_INSERT_STORAGE_LEVEL
+   * @return String
+   */
+  public String getInsertIntoDatasetStorageLevel() {
+    String storageLevel = getProperty(CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL,
+        CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT);
+    boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel);
+    if (!validateStorageLevel) {
+      LOGGER.warn("The " + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL
+          + " configuration value is invalid. It will use default storage level("
+          + CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT
+          + ") to persist dataset.");
+      storageLevel = CarbonCommonConstants.CARBON_INSERT_STORAGE_LEVEL_DEFAULT;
+    }
+    return storageLevel.toUpperCase();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 11cc6ea..f81959e 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -55,7 +55,11 @@ This section provides the details of all the configurations required for CarbonD
 | carbon.max.driver.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded at the driver side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. |  |
 | carbon.max.executor.lru.cache.size | -1 | Max LRU cache size upto which data will be loaded at the executor side. This value is expressed in MB. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. If this parameter is not configured, then the carbon.max.driver.lru.cache.size value will be considered. |  |
 | carbon.merge.sort.prefetch | true | Enable prefetch of data during merge sort while reading data from sort temp files in data loading. |  |
+| carbon.insert.persist.enable | false | Enabling this parameter considers persistent data. If we are executing insert into query from source table using select statement & loading the same source table concurrently, when select happens on source table during the data load, it gets new record for which dictionary is not generated, so there will be inconsistency. To avoid this condition we can persist the dataframe into MEMORY_AND_DISK(default value) and perform insert into operation. By default this value will be false because no need to persist the dataframe in all cases. If user wants to run load and insert queries on source table concurrently then user can enable this parameter. |  |
+| carbon.insert.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe when 'carbon.insert.persist.enable'=true, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |  |
 | carbon.update.persist.enable | true | Enabling this parameter considers persistent data. Enabling this will reduce the execution time of UPDATE operation. |  |
+| carbon.update.storage.level | MEMORY_AND_DISK | Which storage level to persist dataframe when 'carbon.update.persist.enable'=true, if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |  |
+| carbon.global.sort.rdd.storage.level | MEMORY_ONLY | Which storage level to persist rdd when loading data with 'sort_scope'='global_sort', if user's executor has less memory, set this parameter to 'MEMORY_AND_DISK_SER' or other storage level to correspond to different environment. [See detail](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). |  |
 | carbon.load.global.sort.partitions | 0 | The Number of partitions to use when shuffling data for sort. If user don't configurate or configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we recommend 2-3 tasks per CPU core in your cluster.
 | carbon.options.bad.records.logger.enable | false | Whether to create logs with details about bad records. | |
 | carbon.bad.records.action | FORCE | This property can have four types of actions for bad records FORCE, REDIRECT, IGNORE and FAIL. If set to FORCE then it auto-corrects the data by storing the bad records as NULL. If set to REDIRECT then bad records are written to the raw CSV instead of being loaded. If set to IGNORE then bad records are neither loaded nor written to the raw CSV. If set to FAIL then data loading fails if any bad records are found. | |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181f0ac9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 702f954..6c74ad2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -53,8 +53,9 @@ case class CarbonInsertIntoCommand(
     val df =
       if (isPersistRequired) {
         LOGGER.info("Persist enabled for Insert operation")
-        Dataset.ofRows(sparkSession, child)
-          .persist(StorageLevel.MEMORY_AND_DISK)
+        Dataset.ofRows(sparkSession, child).persist(
+          StorageLevel.fromString(
+            CarbonProperties.getInstance.getInsertIntoDatasetStorageLevel))
       } else {
         Dataset.ofRows(sparkSession, child)
       }