You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/10 02:25:19 UTC
[hudi] branch master updated: [HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6fd21d0f10 [HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516)
6fd21d0f10 is described below
commit 6fd21d0f1043d0a06b93332d86e63d7b708fcbe8
Author: aliceyyan <10...@users.noreply.github.com>
AuthorDate: Tue May 10 10:25:13 2022 +0800
[HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516)
Co-authored-by: aliceyyan <al...@tencent.com>
---
.../java/org/apache/hudi/source/IncrementalInputSplits.java | 2 +-
.../main/java/org/apache/hudi/table/HoodieTableSource.java | 3 ++-
.../apache/hudi/table/format/mor/MergeOnReadInputSplit.java | 13 ++++++++++++-
3 files changed, 15 insertions(+), 3 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 02e0e253cf..94eeefcd36 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -226,7 +226,7 @@ public class IncrementalInputSplits implements Serializable {
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, endInstant,
- metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
+ metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d00eb3e3ec..da4abf0a96 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -181,6 +181,7 @@ public class HoodieTableSource implements
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
+ .keyBy(inputSplit -> inputSplit.getFileId())
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
@@ -316,7 +317,7 @@ public class HoodieTableSource implements
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
- metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
+ metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 156622c303..cde646e41f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -43,6 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit {
private final long maxCompactionMemoryInBytes;
private final String mergeType;
private final Option<InstantRange> instantRange;
+ private String fileId;
// for streaming reader to record the consumed offset,
// which is the start of next round reading.
@@ -56,7 +57,8 @@ public class MergeOnReadInputSplit implements InputSplit {
String tablePath,
long maxCompactionMemoryInBytes,
String mergeType,
- @Nullable InstantRange instantRange) {
+ @Nullable InstantRange instantRange,
+ String fileId) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
@@ -65,6 +67,15 @@ public class MergeOnReadInputSplit implements InputSplit {
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.mergeType = mergeType;
this.instantRange = Option.ofNullable(instantRange);
+ this.fileId = fileId;
+ }
+
+ public String getFileId() {
+ return fileId;
+ }
+
+ public void setFileId(String fileId) {
+ this.fileId = fileId;
}
public Option<String> getBasePath() {