You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/12 02:00:30 UTC
[hudi] branch master updated: [HUDI-2415] Add more info log for
flink streaming reader (#3642)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5c3e5 [HUDI-2415] Add more info log for flink streaming reader (#3642)
9d5c3e5 is described below
commit 9d5c3e5cb92a4247bb1fc9a4a0e2eb3d2fbce1d6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Sep 12 10:00:17 2021 +0800
[HUDI-2415] Add more info log for flink streaming reader (#3642)
---
.../org/apache/hudi/source/StreamReadMonitoringFunction.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ec56903..c5610d2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -248,6 +248,13 @@ public class StreamReadMonitoringFunction
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
+ if (archivedMetadataList.size() > 0) {
+ LOG.warn(""
+ + "--------------------------------------------------------------------------------\n"
+ + "---------- caution: the reader has fall behind too much from the writer,\n"
+ + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ + "--------------------------------------------------------------------------------");
+ }
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
? mergeList(activeMetadataList, archivedMetadataList)
: activeMetadataList;
@@ -288,6 +295,11 @@ public class StreamReadMonitoringFunction
}
// update the issues instant time
this.issuedInstant = commitToIssue;
+ LOG.info(""
+ + "------------------------------------------------------------\n"
+ + "---------- consumed to instant: {}\n"
+ + "------------------------------------------------------------",
+ commitToIssue);
}
@Override