You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/16 06:49:04 UTC

flink git commit: [FLINK-6175] Harden HistoryServerTest#testFullArchiveLifecycle

Repository: flink
Updated Branches:
  refs/heads/release-1.3 446d651c1 -> 4e7598d9c


[FLINK-6175] Harden HistoryServerTest#testFullArchiveLifecycle

This closes #3655.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e7598d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e7598d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e7598d9

Branch: refs/heads/release-1.3
Commit: 4e7598d9c21821db8f03a84d0c2e64959b651d38
Parents: 446d651
Author: zentol <ch...@apache.org>
Authored: Thu Mar 23 13:58:03 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue May 16 08:48:17 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServer.java       | 16 +++++++-
 .../history/HistoryServerArchiveFetcher.java    | 10 +++--
 .../webmonitor/history/HistoryServerTest.java   | 42 ++++++++------------
 3 files changed, 39 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e7598d9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 4163581..3337370 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.webmonitor.history;
 
 import io.netty.handler.codec.http.router.Router;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +122,13 @@ public class HistoryServer {
 	}
 
 	public HistoryServer(Configuration config) throws IOException, FlinkException {
+		this(config, new CountDownLatch(0));
+	}
+
+	public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
+		Preconditions.checkNotNull(config);
+		Preconditions.checkNotNull(numFinishedPolls);
+
 		this.config = config;
 		if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
 			LOG.info("Enabling SSL for the history server.");
@@ -163,7 +172,7 @@ public class HistoryServer {
 		}
 
 		long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
-		archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir);
+		archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);
 
 		this.shutdownHook = new Thread() {
 			@Override
@@ -183,6 +192,11 @@ public class HistoryServer {
 		}
 	}
 
+	@VisibleForTesting
+	int getWebPort() {
+		return netty.getServerPort();
+	}
+
 	public void run() {
 		try {
 			start();

http://git-wip-us.apache.org/repos/asf/flink/blob/4e7598d9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 824d6c9..0ff9e02 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -67,9 +68,9 @@ class HistoryServerArchiveFetcher {
 	private final JobArchiveFetcherTask fetcherTask;
 	private final long refreshIntervalMillis;
 
-	HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir) {
+	HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
 		this.refreshIntervalMillis = refreshIntervalMillis;
-		this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir);
+		this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
 		if (LOG.isInfoEnabled()) {
 			for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
 				LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
@@ -100,6 +101,7 @@ class HistoryServerArchiveFetcher {
 	static class JobArchiveFetcherTask extends TimerTask {
 
 		private final List<HistoryServer.RefreshLocation> refreshDirs;
+		private final CountDownLatch numFinishedPolls;
 
 		/** Cache of all available jobs identified by their id. */
 		private final Set<String> cachedArchives;
@@ -110,8 +112,9 @@ class HistoryServerArchiveFetcher {
 
 		private static final String JSON_FILE_ENDING = ".json";
 
-		JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir) {
+		JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
 			this.refreshDirs = checkNotNull(refreshDirs);
+			this.numFinishedPolls = numFinishedPolls;
 			this.cachedArchives = new HashSet<>();
 			this.webDir = checkNotNull(webDir);
 			this.webJobDir = new File(webDir, "jobs");
@@ -213,6 +216,7 @@ class HistoryServerArchiveFetcher {
 			} catch (Exception e) {
 				LOG.error("Critical failure while fetching/processing job archives.", e);
 			}
+			numFinishedPolls.countDown();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e7598d9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 78fbe0b..97943c6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.history;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.testkit.TestActorRef;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.IOUtils;
@@ -38,10 +39,12 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 
 public class HistoryServerTest {
@@ -62,42 +65,31 @@ public class HistoryServerTest {
 		config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
 		config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
 
+		config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+
 		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
 		Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
 
-		ActorRef memoryArchivist = actorSystem.actorOf(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath));
+		ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
 		memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
 
 		File archive = new File(jmDirectory, graph.getJobID().toString());
-		for (int x = 0; x < 10 && !archive.exists(); x++) {
-			Thread.sleep(50);
-		}
 		Assert.assertTrue(archive.exists());
 
-		HistoryServer hs = new HistoryServer(config);
+		CountDownLatch numFinishedPolls = new CountDownLatch(1);
+
+		HistoryServer hs = new HistoryServer(config, numFinishedPolls);
 		try {
 			hs.start();
+			String baseUrl = "http://localhost:" + hs.getWebPort();
+			numFinishedPolls.await(10L, TimeUnit.SECONDS);
+
 			ObjectMapper mapper = new ObjectMapper();
-			JsonNode overview = null;
-			for (int x = 0; x < 20; x++) {
-				Thread.sleep(50);
-				String response = getFromHTTP("http://localhost:8082/joboverview");
-				if (response.contains("404 Not Found")) {
-					// file may not be written yet
-					continue;
-				} else {
-					try {
-						overview = mapper.readTree(response);
-						break;
-					} catch (IOException ignored) {
-						// while the file may exist the contents may not have been written yet
-						continue;
-					}
-				}
-			}
-			Assert.assertNotNull("/joboverview.json did not contain valid json", overview);
+			String response = getFromHTTP(baseUrl + "/joboverview");
+			JsonNode overview = mapper.readTree(response);
+
 			String jobID = overview.get("finished").get(0).get("jid").asText();
-			JsonNode jobDetails = mapper.readTree(getFromHTTP("http://localhost:8082/jobs/" + jobID));
+			JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
 			Assert.assertNotNull(jobDetails.get("jid"));
 		} finally {
 			hs.stop();