You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by zh...@apache.org on 2023/02/20 02:05:17 UTC

[hudi] branch master updated: [HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)

This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be92be657a3 [HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)
be92be657a3 is described below

commit be92be657a348954cc21062ca24e8a10caea17ee
Author: gaoshihang <47...@qq.com>
AuthorDate: Mon Feb 20 10:05:09 2023 +0800

    [HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)
    
    * add a new config to specific spark write rdd storage level
    
    * update
    
    * update
    
    * update
    
    ---------
    
    Co-authored-by: gaoshihang <pg...@apac.freewheel.com>
---
 .../main/java/org/apache/hudi/config/HoodieWriteConfig.java    | 10 ++++++++++
 .../table/action/commit/BaseSparkCommitActionExecutor.java     |  4 ++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3da856f61ac..e3b4f046bae 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -127,6 +127,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
 
+  public static final ConfigProperty<String> TAGGED_RECORD_STORAGE_LEVEL_VALUE = ConfigProperty
+       .key("hoodie.write.tagged.record.storage.level")
+       .defaultValue("MEMORY_AND_DISK_SER")
+       .withDocumentation("Determine what level of persistence is used to cache write RDDs. "
+          + "Refer to org.apache.spark.storage.StorageLevel for different values");
+
   public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
       .defaultValue("ts")
@@ -1069,6 +1075,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getSchema();
   }
 
+  public String getTaggedRecordStorageLevel() {
+    return getString(TAGGED_RECORD_STORAGE_LEVEL_VALUE);
+  }
+
   public String getInternalSchema() {
     return getString(INTERNAL_SCHEMA_STRING);
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ac5b8555b05..73c855d5505 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,10 +148,10 @@ public abstract class BaseSparkCommitActionExecutor<T> extends
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
     // Cache the tagged records, so we don't end up computing both
-    // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
     JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
     if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
-      inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+      String writeStorageLevel = config.getTaggedRecordStorageLevel();
+      inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
     } else {
       LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
     }