You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/05 12:55:35 UTC

[4/5] flink git commit: [FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape

This closes #3019.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/411fff58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/411fff58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/411fff58

Branch: refs/heads/master
Commit: 411fff58405804a7f7f79536f8c6885a491dbef6
Parents: ed83b5b
Author: Boris Osipov <bo...@epam.com>
Authored: Fri Dec 16 10:30:33 2016 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100

----------------------------------------------------------------------
 .../webmonitor/WebRuntimeMonitorITCase.java     | 86 ++++++++------------
 1 file changed, 32 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/411fff58/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 853ef14..d8bd6af 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -82,25 +82,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			// Flink w/o a web monitor
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			File logDir = temporaryFolder.newFolder("log");
-			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
-			Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
-			Configuration monitorConfig = new Configuration();
-			monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -228,7 +210,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
 						.useDelimiter("\\A").next();
 
-				// Request the file from the leaading web server
+				// Request the file from the leading web server
 				leaderClient.sendGetRequest("index.html", deadline.timeLeft());
 
 				HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
@@ -352,23 +334,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -430,23 +396,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try {
 			flink = new TestingCluster(new Configuration());
 			flink.start(true);
-
-			ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
-			ActorRef jmActor = flink.jobManagerActors().get().head();
-
-			// Needs to match the leader address from the leader retrieval service
-			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
-			// Web frontend on random port
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-			webMonitor = new WebRuntimeMonitor(
-					config,
-					flink.createLeaderRetrievalService(),
-					jmActorSystem);
-
-			webMonitor.start(jobManagerAddress);
+			webMonitor = startWebRuntimeMonitor(flink);
 
 			try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
 				String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -491,6 +441,34 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		}
 	}
 
+	private WebRuntimeMonitor startWebRuntimeMonitor(
+		TestingCluster flink) throws Exception {
+
+		ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
+		ActorRef jmActor = flink.jobManagerActors().get().head();
+
+		// Needs to match the leader address from the leader retrieval service
+		String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
+
+		File logDir = temporaryFolder.newFolder("log");
+		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+		Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+		// Web frontend on random port
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+
+		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
+			config,
+			flink.createLeaderRetrievalService(),
+			jmActorSystem);
+
+		webMonitor.start(jobManagerAddress);
+		flink.waitForActorsToBeAlive();
+		return webMonitor;
+	}
+
 	// ------------------------------------------------------------------------
 
 	private void waitForLeaderNotification(