You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:38 UTC

[hudi] 28/45: [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 005e913403824fc6d5494bbefe8a370712656782
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Thu Nov 24 13:17:21 2022 +0800

    [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java   | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 578bb10db5..4a3674ec29 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -511,28 +511,27 @@ public class StreamWriteOperatorCoordinator
     }
     setMinEventTime();
     doCommit(instant, writeResults);
-    resetMinEventTime();
     return true;
   }
 
   public void setMinEventTime() {
     if (commitEventTimeEnable) {
-      LOG.info("[setMinEventTime] receive event time for current commit: {} ", Arrays.stream(eventBuffer).map(WriteMetadataEvent::getMaxEventTime).map(String::valueOf)
-          .collect(Collectors.joining(", ")));
-      this.minEventTime = Arrays.stream(eventBuffer)
+      List<Long> eventTimes = Arrays.stream(eventBuffer)
           .filter(Objects::nonNull)
-          .filter(maxEventTime -> maxEventTime.getMaxEventTime() > 0)
           .map(WriteMetadataEvent::getMaxEventTime)
-          .min(Comparator.naturalOrder())
-          .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE);
+          .filter(maxEventTime -> maxEventTime > 0)
+          .collect(Collectors.toList());
+
+      if (!eventTimes.isEmpty()) {
+        LOG.info("[setMinEventTime] receive event time for current commit: {} ",
+            eventTimes.stream().map(String::valueOf).collect(Collectors.joining(", ")));
+        this.minEventTime = eventTimes.stream().min(Comparator.naturalOrder())
+            .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE);
+      }
       LOG.info("[setMinEventTime] minEventTime: {} ", this.minEventTime);
     }
   }
 
-  public void resetMinEventTime() {
-    this.minEventTime = Long.MAX_VALUE;
-  }
-
   /**
    * Performs the actual commit action.
    */