You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/02 12:36:00 UTC
[flink] 04/04: [FLINK-18010][runtime] Expand HistoryServer logging
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ff98f44f9344c0fb6529ad99a250237d4cd85c9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 15:37:26 2020 +0200
[FLINK-18010][runtime] Expand HistoryServer logging
---
.../webmonitor/history/HistoryServerArchiveFetcher.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index fd6069a..5a41173 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -186,11 +186,13 @@ class HistoryServerArchiveFetcher {
@Override
public void run() {
try {
+ LOG.debug("Starting archive fetching.");
List<ArchiveEvent> events = new ArrayList<>();
Set<String> jobsToRemove = new HashSet<>(cachedArchives);
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
Path refreshDir = refreshLocation.getPath();
FileSystem refreshFS = refreshLocation.getFs();
+ LOG.debug("Checking archive directory {}.", refreshDir);
// contents of /:refreshDir
FileStatus[] jobArchives;
@@ -214,7 +216,11 @@ class HistoryServerArchiveFetcher {
continue;
}
jobsToRemove.remove(jobID);
- if (!cachedArchives.contains(jobID)) {
+
+ if (cachedArchives.contains(jobID)) {
+ LOG.trace("Ignoring archive {} because it was already fetched.", jobArchivePath);
+ } else {
+ LOG.info("Processing archive {}.", jobArchivePath);
try {
for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
String path = archive.getPath();
@@ -224,6 +230,7 @@ class HistoryServerArchiveFetcher {
if (path.equals(JobsOverviewHeaders.URL)) {
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
} else if (path.equals("/joboverview")) { // legacy path
+ LOG.debug("Migrating legacy archive {}", jobArchivePath);
json = convertLegacyJobOverview(json);
target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
} else {
@@ -253,6 +260,7 @@ class HistoryServerArchiveFetcher {
}
events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED));
cachedArchives.add(jobID);
+ LOG.info("Processing archive {} finished.", jobArchivePath);
} catch (IOException e) {
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
// Make sure we do not include this job in the overview
@@ -281,6 +289,7 @@ class HistoryServerArchiveFetcher {
updateJobOverview(webOverviewDir, webDir);
}
events.forEach(jobArchiveEventListener::accept);
+ LOG.debug("Finished archive fetching.");
} catch (Exception e) {
LOG.error("Critical failure while fetching/processing job archives.", e);
}