You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/16 07:35:12 UTC
[2/2] flink git commit: [FLINK-6175] Harden
HistoryServerTest#testFullArchiveLifecycle
[FLINK-6175] Harden HistoryServerTest#testFullArchiveLifecycle
This closes #3655.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ba5cd5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ba5cd5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ba5cd5a
Branch: refs/heads/master
Commit: 5ba5cd5ac9ab00da18b723177fb8c977c56e009c
Parents: b54a72e
Author: zentol <ch...@apache.org>
Authored: Thu Mar 23 13:58:03 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue May 16 08:53:33 2017 +0200
----------------------------------------------------------------------
.../webmonitor/history/HistoryServer.java | 16 +++++++-
.../history/HistoryServerArchiveFetcher.java | 10 +++--
.../webmonitor/history/HistoryServerTest.java | 42 ++++++++------------
3 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ba5cd5a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 4163581..3337370 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.history;
import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +122,13 @@ public class HistoryServer {
}
public HistoryServer(Configuration config) throws IOException, FlinkException {
+ this(config, new CountDownLatch(0));
+ }
+
+ public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(numFinishedPolls);
+
this.config = config;
if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
LOG.info("Enabling SSL for the history server.");
@@ -163,7 +172,7 @@ public class HistoryServer {
}
long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
- archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir);
+ archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
this.shutdownHook = new Thread() {
@Override
@@ -183,6 +192,11 @@ public class HistoryServer {
}
}
+ @VisibleForTesting
+ int getWebPort() {
+ return netty.getServerPort();
+ }
+
public void run() {
try {
start();
http://git-wip-us.apache.org/repos/asf/flink/blob/5ba5cd5a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 824d6c9..0ff9e02 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -67,9 +68,9 @@ class HistoryServerArchiveFetcher {
private final JobArchiveFetcherTask fetcherTask;
private final long refreshIntervalMillis;
- HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir) {
+ HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
this.refreshIntervalMillis = refreshIntervalMillis;
- this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir);
+ this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
if (LOG.isInfoEnabled()) {
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
@@ -100,6 +101,7 @@ class HistoryServerArchiveFetcher {
static class JobArchiveFetcherTask extends TimerTask {
private final List<HistoryServer.RefreshLocation> refreshDirs;
+ private final CountDownLatch numFinishedPolls;
/** Cache of all available jobs identified by their id. */
private final Set<String> cachedArchives;
@@ -110,8 +112,9 @@ class HistoryServerArchiveFetcher {
private static final String JSON_FILE_ENDING = ".json";
- JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir) {
+ JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
this.refreshDirs = checkNotNull(refreshDirs);
+ this.numFinishedPolls = numFinishedPolls;
this.cachedArchives = new HashSet<>();
this.webDir = checkNotNull(webDir);
this.webJobDir = new File(webDir, "jobs");
@@ -213,6 +216,7 @@ class HistoryServerArchiveFetcher {
} catch (Exception e) {
LOG.error("Critical failure while fetching/processing job archives.", e);
}
+ numFinishedPolls.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ba5cd5a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 78fbe0b..97943c6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.history;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.testkit.TestActorRef;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
@@ -38,10 +39,12 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
-import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import scala.Option;
public class HistoryServerTest {
@@ -62,42 +65,31 @@ public class HistoryServerTest {
config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+ config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+
ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
- ActorRef memoryArchivist = actorSystem.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath));
+ ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
File archive = new File(jmDirectory, graph.getJobID().toString());
- for (int x = 0; x < 10 && !archive.exists(); x++) {
- Thread.sleep(50);
- }
Assert.assertTrue(archive.exists());
- HistoryServer hs = new HistoryServer(config);
+ CountDownLatch numFinishedPolls = new CountDownLatch(1);
+
+ HistoryServer hs = new HistoryServer(config, numFinishedPolls);
try {
hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ numFinishedPolls.await(10L, TimeUnit.SECONDS);
+
ObjectMapper mapper = new ObjectMapper();
- JsonNode overview = null;
- for (int x = 0; x < 20; x++) {
- Thread.sleep(50);
- String response = getFromHTTP("http://localhost:8082/joboverview");
- if (response.contains("404 Not Found")) {
- // file may not be written yet
- continue;
- } else {
- try {
- overview = mapper.readTree(response);
- break;
- } catch (IOException ignored) {
- // while the file may exist the contents may not have been written yet
- continue;
- }
- }
- }
- Assert.assertNotNull("/joboverview.json did not contain valid json", overview);
+ String response = getFromHTTP(baseUrl + "/joboverview");
+ JsonNode overview = mapper.readTree(response);
+
String jobID = overview.get("finished").get(0).get("jid").asText();
- JsonNode jobDetails = mapper.readTree(getFromHTTP("http://localhost:8082/jobs/" + jobID));
+ JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
Assert.assertNotNull(jobDetails.get("jid"));
} finally {
hs.stop();