You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/09/05 07:43:01 UTC

[flink] branch release-1.8 updated: [FLINK-13892][hs] Harden HistoryServerTest

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new d540794  [FLINK-13892][hs] Harden HistoryServerTest
d540794 is described below

commit d5407947f23f7a6a9e8a8dbc7d2e78ea6257b7a8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 29 13:15:08 2019 +0200

    [FLINK-13892][hs] Harden HistoryServerTest
---
 .../flink/runtime/webmonitor/history/HistoryServer.java      |  6 +++---
 .../webmonitor/history/HistoryServerArchiveFetcher.java      | 12 ++++++------
 .../flink/runtime/webmonitor/history/HistoryServerTest.java  |  7 ++++---
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index a93fe93..f1a5330 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -140,9 +140,9 @@ public class HistoryServer {
 		this(config, new CountDownLatch(0));
 	}
 
-	public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
+	public HistoryServer(Configuration config, CountDownLatch numArchivedJobs) throws IOException, FlinkException {
 		Preconditions.checkNotNull(config);
-		Preconditions.checkNotNull(numFinishedPolls);
+		Preconditions.checkNotNull(numArchivedJobs);
 
 		this.config = config;
 		if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
@@ -187,7 +187,7 @@ public class HistoryServer {
 		}
 
 		long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
-		archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
+		archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numArchivedJobs);
 
 		this.shutdownHook = ShutdownHookUtil.addShutdownHook(
 			HistoryServer.this::stop,
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index f95b14c..47888cd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -79,9 +79,9 @@ class HistoryServerArchiveFetcher {
 	private final JobArchiveFetcherTask fetcherTask;
 	private final long refreshIntervalMillis;
 
-	HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+	HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
 		this.refreshIntervalMillis = refreshIntervalMillis;
-		this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
+		this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numArchivedJobs);
 		if (LOG.isInfoEnabled()) {
 			for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
 				LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
@@ -112,7 +112,7 @@ class HistoryServerArchiveFetcher {
 	static class JobArchiveFetcherTask extends TimerTask {
 
 		private final List<HistoryServer.RefreshLocation> refreshDirs;
-		private final CountDownLatch numFinishedPolls;
+		private final CountDownLatch numArchivedJobs;
 
 		/** Cache of all available jobs identified by their id. */
 		private final Set<String> cachedArchives;
@@ -123,9 +123,9 @@ class HistoryServerArchiveFetcher {
 
 		private static final String JSON_FILE_ENDING = ".json";
 
-		JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+		JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
 			this.refreshDirs = checkNotNull(refreshDirs);
-			this.numFinishedPolls = numFinishedPolls;
+			this.numArchivedJobs = numArchivedJobs;
 			this.cachedArchives = new HashSet<>();
 			this.webDir = checkNotNull(webDir);
 			this.webJobDir = new File(webDir, "jobs");
@@ -200,6 +200,7 @@ class HistoryServerArchiveFetcher {
 									}
 								}
 								updateOverview = true;
+								numArchivedJobs.countDown();
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
 								// Make sure we attempt to fetch the archive again
@@ -228,7 +229,6 @@ class HistoryServerArchiveFetcher {
 			} catch (Exception e) {
 				LOG.error("Critical failure while fetching/processing job archives.", e);
 			}
-			numFinishedPolls.countDown();
 		}
 	}
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 03bcfa8..f88ae01 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -113,11 +113,12 @@ public class HistoryServerTest extends TestLogger {
 		}
 		createLegacyArchive(jmDirectory.toPath());
 
-		CountDownLatch numFinishedPolls = new CountDownLatch(1);
+		CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs + 1);
 
 		Configuration historyServerConfig = new Configuration();
 		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
 		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+		historyServerConfig.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 100L);
 
 		historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
@@ -128,11 +129,11 @@ public class HistoryServerTest extends TestLogger {
 			archives = jmDirectory.listFiles();
 		}
 
-		HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
+		HistoryServer hs = new HistoryServer(historyServerConfig, numExpectedArchivedJobs);
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
-			numFinishedPolls.await(10L, TimeUnit.SECONDS);
+			numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
 
 			ObjectMapper mapper = new ObjectMapper();
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);