You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/28 00:37:42 UTC

[hudi] branch master updated: [HUDI-4151] flink split_reader supports rocksdb (#5675)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 93fe5a497e [HUDI-4151] flink split_reader supports rocksdb (#5675)
93fe5a497e is described below

commit 93fe5a497e8f2f97d90f3dd66d966f9f4cc24bc9
Author: Bo Cui <cu...@163.com>
AuthorDate: Sat May 28 08:37:34 2022 +0800

    [HUDI-4151] flink split_reader supports rocksdb (#5675)
    
    * [HUDI-4151] flink split_reader supports rocksdb
---
 .../org/apache/hudi/table/format/FormatUtils.java  | 35 ++++++++++------------
 .../table/format/mor/MergeOnReadInputFormat.java   |  8 ++---
 2 files changed, 20 insertions(+), 23 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 478f94cb71..eb058597f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
 import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
@@ -120,38 +121,34 @@ public class FormatUtils {
   public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
-      Configuration config,
-      boolean withOperationField) {
-    FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+      org.apache.flink.configuration.Configuration flinkConf,
+      Configuration hadoopConf) {
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf);
+    FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
     return HoodieMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
         .withBasePath(split.getTablePath())
         .withLogFilePaths(split.getLogPaths().get())
         .withReaderSchema(logSchema)
         .withLatestInstantTime(split.getLatestCommit())
-        .withReadBlocksLazily(
-            string2Boolean(
-                config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-                    HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
         .withReverseReader(false)
-        .withBufferSize(
-            config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-                HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
-        .withSpillableMapBasePath(
-            config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-                HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+        .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
         .withInstantRange(split.getInstantRange())
-        .withOperationField(withOperationField)
+        .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
         .build();
   }
 
   private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
-      Configuration config,
+      org.apache.flink.configuration.Configuration flinkConf,
+      Configuration hadoopConf,
       HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
-    FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+    FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
     return HoodieUnMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
         .withBasePath(split.getTablePath())
@@ -160,11 +157,11 @@ public class FormatUtils {
         .withLatestInstantTime(split.getLatestCommit())
         .withReadBlocksLazily(
             string2Boolean(
-                config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+                flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
                     HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
         .withReverseReader(false)
         .withBufferSize(
-            config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+            flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
                 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
         .withInstantRange(split.getInstantRange())
         .withLogRecordScannerCallback(callback)
@@ -198,7 +195,7 @@ public class FormatUtils {
           Functions.noop());
       // Consumer of this record reader
       this.iterator = this.executor.getQueue().iterator();
-      this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
+      this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf,
           record -> executor.getQueue().insertRecord(record));
       // Start reading and buffering
       this.executor.startProducers();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4f2de3648e..8eaa9d0b88 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -192,6 +192,7 @@ public class MergeOnReadInputFormat
           getLogFileIterator(split));
     } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
       this.iterator = new MergeIterator(
+          conf,
           hadoopConf,
           split,
           this.tableState.getRowType(),
@@ -200,7 +201,6 @@ public class MergeOnReadInputFormat
           new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
           this.requiredPos,
           this.emitDelete,
-          this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
           this.tableState.getOperationPos(),
           getFullSchemaReader(split.getBasePath().get()));
     } else {
@@ -323,7 +323,7 @@ public class MergeOnReadInputFormat
     final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
         AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf);
     final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
     final int[] pkOffset = tableState.getPkOffsetsInRequired();
     // flag saying whether the pk semantics has been dropped by user specified
@@ -639,6 +639,7 @@ public class MergeOnReadInputFormat
     private RowData currentRecord;
 
     MergeIterator(
+        Configuration finkConf,
         org.apache.hadoop.conf.Configuration hadoopConf,
         MergeOnReadInputSplit split,
         RowType tableRowType,
@@ -647,12 +648,11 @@ public class MergeOnReadInputFormat
         Schema requiredSchema,
         int[] requiredPos,
         boolean emitDelete,
-        boolean withOperationField,
         int operationPos,
         ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
-      this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
+      this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf);
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
       this.requiredSchema = requiredSchema;
       this.requiredPos = requiredPos;