You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by zh...@apache.org on 2023/02/20 02:05:17 UTC
[hudi] branch master updated: [HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)
This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be92be657a3 [HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)
be92be657a3 is described below
commit be92be657a348954cc21062ca24e8a10caea17ee
Author: gaoshihang <47...@qq.com>
AuthorDate: Mon Feb 20 10:05:09 2023 +0800
[HUDI-5786] Add a new config to specific spark write rdd storage level (#7941)
* add a new config to specific spark write rdd storage level
* update
* update
* update
---------
Co-authored-by: gaoshihang <pg...@apac.freewheel.com>
---
.../main/java/org/apache/hudi/config/HoodieWriteConfig.java | 10 ++++++++++
.../table/action/commit/BaseSparkCommitActionExecutor.java | 4 ++--
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3da856f61ac..e3b4f046bae 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -127,6 +127,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
+ public static final ConfigProperty<String> TAGGED_RECORD_STORAGE_LEVEL_VALUE = ConfigProperty
+ .key("hoodie.write.tagged.record.storage.level")
+ .defaultValue("MEMORY_AND_DISK_SER")
+ .withDocumentation("Determine what level of persistence is used to cache write RDDs. "
+ + "Refer to org.apache.spark.storage.StorageLevel for different values");
+
public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.precombine.field")
.defaultValue("ts")
@@ -1069,6 +1075,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getSchema();
}
+ public String getTaggedRecordStorageLevel() {
+ return getString(TAGGED_RECORD_STORAGE_LEVEL_VALUE);
+ }
+
public String getInternalSchema() {
return getString(INTERNAL_SCHEMA_STRING);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ac5b8555b05..73c855d5505 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,10 +148,10 @@ public abstract class BaseSparkCommitActionExecutor<T> extends
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
// Cache the tagged records, so we don't end up computing both
- // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
- inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+ String writeStorageLevel = config.getTaggedRecordStorageLevel();
+ inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
} else {
LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
}