You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/27 03:09:37 UTC
[hudi] branch master updated: [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 72fa19bcc9 [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
72fa19bcc9 is described below
commit 72fa19bcc9586e693f4afc99232c2467e93d2148
Author: cxzl25 <cx...@users.noreply.github.com>
AuthorDate: Mon Jun 27 11:09:30 2022 +0800
[HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
---
.../client/clustering/run/strategy/JavaExecutionStrategy.java | 2 ++
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 2 ++
.../java/org/apache/hudi/sink/clustering/ClusteringOperator.java | 2 ++
.../src/main/java/org/apache/hudi/table/format/FormatUtils.java | 1 +
.../src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | 9 +++++++--
5 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 233c70ecf9..adcbb874e8 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -191,6 +191,8 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
+ .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+ .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index e09457f0e5..df0ad6e2b8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -221,6 +221,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
+ .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+ .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 676532402c..ca48dcf6cb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -269,6 +269,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+ .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+ .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index eb058597f8..8adbde355c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -136,6 +136,7 @@ public class FormatUtils {
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+ .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withInstantRange(split.getInstantRange())
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 2fdb9b882e..cd38e233e1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
@@ -361,7 +361,12 @@ private object HoodieMergeOnReadRDD {
.withSpillableMapBasePath(
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-
+ .withDiskMapType(
+ hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+ HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+ .withBitCaskDiskMapCompressionEnabled(
+ hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+ HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
if (logFiles.nonEmpty) {
logRecordScannerBuilder.withPartition(
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))