You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/09/05 07:42:17 UTC
[flink] branch release-1.9 updated: [FLINK-13892][hs] Harden
HistoryServerTest
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 2f26f89 [FLINK-13892][hs] Harden HistoryServerTest
2f26f89 is described below
commit 2f26f894be9905efa5cc90e28479ef8d96a4fc8d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Aug 29 13:15:08 2019 +0200
[FLINK-13892][hs] Harden HistoryServerTest
---
.../flink/runtime/webmonitor/history/HistoryServer.java | 6 +++---
.../webmonitor/history/HistoryServerArchiveFetcher.java | 12 ++++++------
.../flink/runtime/webmonitor/history/HistoryServerTest.java | 7 ++++---
3 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index e907836..37407bb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -137,9 +137,9 @@ public class HistoryServer {
this(config, new CountDownLatch(0));
}
- public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
+ public HistoryServer(Configuration config, CountDownLatch numArchivedJobs) throws IOException, FlinkException {
Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(numFinishedPolls);
+ Preconditions.checkNotNull(numArchivedJobs);
this.config = config;
if (HistoryServerUtils.isSSLEnabled(config)) {
@@ -184,7 +184,7 @@ public class HistoryServer {
}
long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
- archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
+ archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numArchivedJobs);
this.shutdownHook = ShutdownHookUtil.addShutdownHook(
HistoryServer.this::stop,
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index f95b14c..47888cd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -79,9 +79,9 @@ class HistoryServerArchiveFetcher {
private final JobArchiveFetcherTask fetcherTask;
private final long refreshIntervalMillis;
- HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+ HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
this.refreshIntervalMillis = refreshIntervalMillis;
- this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
+ this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numArchivedJobs);
if (LOG.isInfoEnabled()) {
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
@@ -112,7 +112,7 @@ class HistoryServerArchiveFetcher {
static class JobArchiveFetcherTask extends TimerTask {
private final List<HistoryServer.RefreshLocation> refreshDirs;
- private final CountDownLatch numFinishedPolls;
+ private final CountDownLatch numArchivedJobs;
/** Cache of all available jobs identified by their id. */
private final Set<String> cachedArchives;
@@ -123,9 +123,9 @@ class HistoryServerArchiveFetcher {
private static final String JSON_FILE_ENDING = ".json";
- JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
+ JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numArchivedJobs) {
this.refreshDirs = checkNotNull(refreshDirs);
- this.numFinishedPolls = numFinishedPolls;
+ this.numArchivedJobs = numArchivedJobs;
this.cachedArchives = new HashSet<>();
this.webDir = checkNotNull(webDir);
this.webJobDir = new File(webDir, "jobs");
@@ -200,6 +200,7 @@ class HistoryServerArchiveFetcher {
}
}
updateOverview = true;
+ numArchivedJobs.countDown();
} catch (IOException e) {
LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
// Make sure we attempt to fetch the archive again
@@ -228,7 +229,6 @@ class HistoryServerArchiveFetcher {
} catch (Exception e) {
LOG.error("Critical failure while fetching/processing job archives.", e);
}
- numFinishedPolls.countDown();
}
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a1ace56..fa932dc 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -116,11 +116,12 @@ public class HistoryServerTest extends TestLogger {
}
createLegacyArchive(jmDirectory.toPath());
- CountDownLatch numFinishedPolls = new CountDownLatch(1);
+ CountDownLatch numExpectedArchivedJobs = new CountDownLatch(numJobs + 1);
Configuration historyServerConfig = new Configuration();
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+ historyServerConfig.setLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL, 100L);
historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
@@ -131,11 +132,11 @@ public class HistoryServerTest extends TestLogger {
archives = jmDirectory.listFiles();
}
- HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
+ HistoryServer hs = new HistoryServer(historyServerConfig, numExpectedArchivedJobs);
try {
hs.start();
String baseUrl = "http://localhost:" + hs.getWebPort();
- numFinishedPolls.await(10L, TimeUnit.SECONDS);
+ numExpectedArchivedJobs.await(10L, TimeUnit.SECONDS);
ObjectMapper mapper = new ObjectMapper();
String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);