You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:38 UTC
[hudi] 28/45: [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 005e913403824fc6d5494bbefe8a370712656782
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Thu Nov 24 13:17:21 2022 +0800
[HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 21 ++++++++++-----------
1 file changed, 10 insertions(+), 11 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 578bb10db5..4a3674ec29 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -511,28 +511,27 @@ public class StreamWriteOperatorCoordinator
}
setMinEventTime();
doCommit(instant, writeResults);
- resetMinEventTime();
return true;
}
public void setMinEventTime() {
if (commitEventTimeEnable) {
- LOG.info("[setMinEventTime] receive event time for current commit: {} ", Arrays.stream(eventBuffer).map(WriteMetadataEvent::getMaxEventTime).map(String::valueOf)
- .collect(Collectors.joining(", ")));
- this.minEventTime = Arrays.stream(eventBuffer)
+ List<Long> eventTimes = Arrays.stream(eventBuffer)
.filter(Objects::nonNull)
- .filter(maxEventTime -> maxEventTime.getMaxEventTime() > 0)
.map(WriteMetadataEvent::getMaxEventTime)
- .min(Comparator.naturalOrder())
- .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE);
+ .filter(maxEventTime -> maxEventTime > 0)
+ .collect(Collectors.toList());
+
+ if (!eventTimes.isEmpty()) {
+ LOG.info("[setMinEventTime] receive event time for current commit: {} ",
+ eventTimes.stream().map(String::valueOf).collect(Collectors.joining(", ")));
+ this.minEventTime = eventTimes.stream().min(Comparator.naturalOrder())
+ .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE);
+ }
LOG.info("[setMinEventTime] minEventTime: {} ", this.minEventTime);
}
}
- public void resetMinEventTime() {
- this.minEventTime = Long.MAX_VALUE;
- }
-
/**
* Performs the actual commit action.
*/