You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/05 12:55:35 UTC
[4/5] flink git commit: [FLINK-4255] Unstable test
WebRuntimeMonitorITCase.testNoEscape
[FLINK-4255] Unstable test WebRuntimeMonitorITCase.testNoEscape
This closes #3019.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/411fff58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/411fff58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/411fff58
Branch: refs/heads/master
Commit: 411fff58405804a7f7f79536f8c6885a491dbef6
Parents: ed83b5b
Author: Boris Osipov <bo...@epam.com>
Authored: Fri Dec 16 10:30:33 2016 +0300
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 5 12:28:18 2017 +0100
----------------------------------------------------------------------
.../webmonitor/WebRuntimeMonitorITCase.java | 86 ++++++++------------
1 file changed, 32 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/411fff58/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 853ef14..d8bd6af 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -82,25 +82,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
// Flink w/o a web monitor
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- File logDir = temporaryFolder.newFolder("log");
- Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
- Files.createFile(new File(logDir, "jobmanager.out").toPath());
-
- Configuration monitorConfig = new Configuration();
- monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
- monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -228,7 +210,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
String expected = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
.useDelimiter("\\A").next();
- // Request the file from the leaading web server
+ // Request the file from the leading web server
leaderClient.sendGetRequest("index.html", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
@@ -352,23 +334,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- // Web frontend on random port
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
- webMonitor = new WebRuntimeMonitor(
- config,
- flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -430,23 +396,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try {
flink = new TestingCluster(new Configuration());
flink.start(true);
-
- ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
- ActorRef jmActor = flink.jobManagerActors().get().head();
-
- // Needs to match the leader address from the leader retrieval service
- String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
-
- // Web frontend on random port
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
- webMonitor = new WebRuntimeMonitor(
- config,
- flink.createLeaderRetrievalService(),
- jmActorSystem);
-
- webMonitor.start(jobManagerAddress);
+ webMonitor = startWebRuntimeMonitor(flink);
try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) {
String expectedIndex = new Scanner(new File(MAIN_RESOURCES_PATH + "/index.html"))
@@ -491,6 +441,34 @@ public class WebRuntimeMonitorITCase extends TestLogger {
}
}
+ private WebRuntimeMonitor startWebRuntimeMonitor(
+ TestingCluster flink) throws Exception {
+
+ ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head();
+ ActorRef jmActor = flink.jobManagerActors().get().head();
+
+ // Needs to match the leader address from the leader retrieval service
+ String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
+
+ File logDir = temporaryFolder.newFolder("log");
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Files.createFile(new File(logDir, "jobmanager.out").toPath());
+
+ // Web frontend on random port
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+
+ WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
+ config,
+ flink.createLeaderRetrievalService(),
+ jmActorSystem);
+
+ webMonitor.start(jobManagerAddress);
+ flink.waitForActorsToBeAlive();
+ return webMonitor;
+ }
+
// ------------------------------------------------------------------------
private void waitForLeaderNotification(