You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/08 02:44:08 UTC

[hudi] branch master updated: [HUDI-2401] Load archived instants for flink streaming reader (#3610)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3a2ea  [HUDI-2401] Load archived instants for flink streaming reader (#3610)
cf3a2ea is described below

commit cf3a2ead32f432757668d49a9138f891110aa9a5
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Sep 8 10:43:54 2021 +0800

    [HUDI-2401] Load archived instants for flink streaming reader (#3610)
---
 .../org/apache/hudi/source/StreamReadMonitoringFunction.java   | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 112dfda..ec56903 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -352,10 +352,16 @@ public class StreamReadMonitoringFunction
       // 1. the start commit is 'earliest';
       // 2. the start instant is archived.
       HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
-      if (!metaClient.getArchivedTimeline().empty()) {
-        Stream<HoodieInstant> instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants();
+      HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+      if (!archivedCompleteTimeline.empty()) {
+        final String endTs = archivedCompleteTimeline.lastInstant().get().getTimestamp();
+        Stream<HoodieInstant> instantStream = archivedCompleteTimeline.getInstants();
         if (instantRange != null) {
+          archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), endTs);
           instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant()));
+        } else {
+          final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
+          archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
         }
         return instantStream
             .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());