You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/27 03:09:37 UTC

[hudi] branch master updated: [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 72fa19bcc9 [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
72fa19bcc9 is described below

commit 72fa19bcc9586e693f4afc99232c2467e93d2148
Author: cxzl25 <cx...@users.noreply.github.com>
AuthorDate: Mon Jun 27 11:09:30 2022 +0800

    [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
---
 .../client/clustering/run/strategy/JavaExecutionStrategy.java    | 2 ++
 .../run/strategy/MultipleSparkJobExecutionStrategy.java          | 2 ++
 .../java/org/apache/hudi/sink/clustering/ClusteringOperator.java | 2 ++
 .../src/main/java/org/apache/hudi/table/format/FormatUtils.java  | 1 +
 .../src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala    | 9 +++++++--
 5 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 233c70ecf9..adcbb874e8 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -191,6 +191,8 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withSpillableMapBasePath(config.getSpillableMapBasePath())
             .withPartition(clusteringOp.getPartitionPath())
+            .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+            .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
             .build();
 
         Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index e09457f0e5..df0ad6e2b8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -221,6 +221,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
               .withBufferSize(config.getMaxDFSStreamBufferSize())
               .withSpillableMapBasePath(config.getSpillableMapBasePath())
               .withPartition(clusteringOp.getPartitionPath())
+              .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+              .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
               .build();
 
           Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 676532402c..ca48dcf6cb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -269,6 +269,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
             .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
             .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
             .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
+            .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+            .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
             .build();
 
         HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index eb058597f8..8adbde355c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -136,6 +136,7 @@ public class FormatUtils {
         .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
         .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+        .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
         .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
         .withInstantRange(split.getInstantRange())
         .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 2fdb9b882e..cd38e233e1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
@@ -361,7 +361,12 @@ private object HoodieMergeOnReadRDD {
         .withSpillableMapBasePath(
           hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
             HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-
+        .withDiskMapType(
+          hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
+            HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
+        .withBitCaskDiskMapCompressionEnabled(
+          hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+          HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
       if (logFiles.nonEmpty) {
         logRecordScannerBuilder.withPartition(
           getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))