You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/12 02:00:30 UTC

[hudi] branch master updated: [HUDI-2415] Add more info log for flink streaming reader (#3642)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5c3e5  [HUDI-2415]  Add more info log for flink streaming reader (#3642)
9d5c3e5 is described below

commit 9d5c3e5cb92a4247bb1fc9a4a0e2eb3d2fbce1d6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Sep 12 10:00:17 2021 +0800

    [HUDI-2415]  Add more info log for flink streaming reader (#3642)
---
 .../org/apache/hudi/source/StreamReadMonitoringFunction.java | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ec56903..c5610d2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -248,6 +248,13 @@ public class StreamReadMonitoringFunction
     List<HoodieCommitMetadata> activeMetadataList = instants.stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
     List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
+    if (archivedMetadataList.size() > 0) {
+      LOG.warn(""
+          + "--------------------------------------------------------------------------------\n"
+          + "---------- caution: the reader has fall behind too much from the writer,\n"
+          + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+          + "--------------------------------------------------------------------------------");
+    }
     List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
         ? mergeList(activeMetadataList, archivedMetadataList)
         : activeMetadataList;
@@ -288,6 +295,11 @@ public class StreamReadMonitoringFunction
     }
     // update the issues instant time
     this.issuedInstant = commitToIssue;
+    LOG.info(""
+        + "------------------------------------------------------------\n"
+        + "---------- consumed to instant: {}\n"
+        + "------------------------------------------------------------",
+        commitToIssue);
   }
 
   @Override