You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/28 00:37:42 UTC
[hudi] branch master updated: [HUDI-4151] flink split_reader supports rocksdb (#5675)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 93fe5a497e [HUDI-4151] flink split_reader supports rocksdb (#5675)
93fe5a497e is described below
commit 93fe5a497e8f2f97d90f3dd66d966f9f4cc24bc9
Author: Bo Cui <cu...@163.com>
AuthorDate: Sat May 28 08:37:34 2022 +0800
[HUDI-4151] flink split_reader supports rocksdb (#5675)
* [HUDI-4151] flink split_reader supports rocksdb
---
.../org/apache/hudi/table/format/FormatUtils.java | 35 ++++++++++------------
.../table/format/mor/MergeOnReadInputFormat.java | 8 ++---
2 files changed, 20 insertions(+), 23 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 478f94cb71..eb058597f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -120,38 +121,34 @@ public class FormatUtils {
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
- Configuration config,
- boolean withOperationField) {
- FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+ org.apache.flink.configuration.Configuration flinkConf,
+ Configuration hadoopConf) {
+ HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf);
+ FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
.withLogFilePaths(split.getLogPaths().get())
.withReaderSchema(logSchema)
.withLatestInstantTime(split.getLatestCommit())
- .withReadBlocksLazily(
- string2Boolean(
- config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
- HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
.withReverseReader(false)
- .withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
- .withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
+ .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withInstantRange(split.getInstantRange())
- .withOperationField(withOperationField)
+ .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.build();
}
private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
MergeOnReadInputSplit split,
Schema logSchema,
- Configuration config,
+ org.apache.flink.configuration.Configuration flinkConf,
+ Configuration hadoopConf,
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
- FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+ FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
return HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
@@ -160,11 +157,11 @@ public class FormatUtils {
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
string2Boolean(
- config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+ flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
.withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+ flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withInstantRange(split.getInstantRange())
.withLogRecordScannerCallback(callback)
@@ -198,7 +195,7 @@ public class FormatUtils {
Functions.noop());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
- this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
+ this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf,
record -> executor.getQueue().insertRecord(record));
// Start reading and buffering
this.executor.startProducers();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4f2de3648e..8eaa9d0b88 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -192,6 +192,7 @@ public class MergeOnReadInputFormat
getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
this.iterator = new MergeIterator(
+ conf,
hadoopConf,
split,
this.tableState.getRowType(),
@@ -200,7 +201,6 @@ public class MergeOnReadInputFormat
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
this.emitDelete,
- this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
this.tableState.getOperationPos(),
getFullSchemaReader(split.getBasePath().get()));
} else {
@@ -323,7 +323,7 @@ public class MergeOnReadInputFormat
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
- final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
+ final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf);
final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
@@ -639,6 +639,7 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
MergeIterator(
+ Configuration finkConf,
org.apache.hadoop.conf.Configuration hadoopConf,
MergeOnReadInputSplit split,
RowType tableRowType,
@@ -647,12 +648,11 @@ public class MergeOnReadInputFormat
Schema requiredSchema,
int[] requiredPos,
boolean emitDelete,
- boolean withOperationField,
int operationPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
- this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
+ this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf);
this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;