You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/02 12:36:00 UTC

[flink] 04/04: [FLINK-18010][runtime] Expand HistoryServer logging

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ff98f44f9344c0fb6529ad99a250237d4cd85c9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 15:37:26 2020 +0200

    [FLINK-18010][runtime] Expand HistoryServer logging
---
 .../webmonitor/history/HistoryServerArchiveFetcher.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index fd6069a..5a41173 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -186,11 +186,13 @@ class HistoryServerArchiveFetcher {
 		@Override
 		public void run() {
 			try {
+				LOG.debug("Starting archive fetching.");
 				List<ArchiveEvent> events = new ArrayList<>();
 				Set<String> jobsToRemove = new HashSet<>(cachedArchives);
 				for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
 					Path refreshDir = refreshLocation.getPath();
 					FileSystem refreshFS = refreshLocation.getFs();
+					LOG.debug("Checking archive directory {}.", refreshDir);
 
 					// contents of /:refreshDir
 					FileStatus[] jobArchives;
@@ -214,7 +216,11 @@ class HistoryServerArchiveFetcher {
 							continue;
 						}
 						jobsToRemove.remove(jobID);
-						if (!cachedArchives.contains(jobID)) {
+
+						if (cachedArchives.contains(jobID)) {
+							LOG.trace("Ignoring archive {} because it was already fetched.", jobArchivePath);
+						} else {
+							LOG.info("Processing archive {}.", jobArchivePath);
 							try {
 								for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
 									String path = archive.getPath();
@@ -224,6 +230,7 @@ class HistoryServerArchiveFetcher {
 									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else if (path.equals("/joboverview")) { // legacy path
+										LOG.debug("Migrating legacy archive {}", jobArchivePath);
 										json = convertLegacyJobOverview(json);
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
@@ -253,6 +260,7 @@ class HistoryServerArchiveFetcher {
 								}
 								events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED));
 								cachedArchives.add(jobID);
+								LOG.info("Processing archive {} finished.", jobArchivePath);
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
 								// Make sure we do not include this job in the overview
@@ -281,6 +289,7 @@ class HistoryServerArchiveFetcher {
 					updateJobOverview(webOverviewDir, webDir);
 				}
 				events.forEach(jobArchiveEventListener::accept);
+				LOG.debug("Finished archive fetching.");
 			} catch (Exception e) {
 				LOG.error("Critical failure while fetching/processing job archives.", e);
 			}