You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:28 UTC
[8/9] flink git commit: [FLINK-8703][tests] Port WebFrontendITCase to
MiniClusterResource
[FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource
This closes #5665.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/420190d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/420190d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/420190d8
Branch: refs/heads/master
Commit: 420190d85cdab414fa24db9c161852fcbcb81451
Parents: 31f4036
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:46 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200
----------------------------------------------------------------------
.../runtime/webmonitor/WebFrontendITCase.java | 202 +++++++++++++------
.../webmonitor/testutils/HttpTestClient.java | 19 ++
.../flink/test/util/MiniClusterResource.java | 4 +-
3 files changed, 166 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 14602e3..f512766 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,15 +19,19 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
@@ -38,8 +42,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
import java.io.File;
@@ -47,8 +52,13 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
@@ -66,40 +76,49 @@ public class WebFrontendITCase extends TestLogger {
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
- private static LocalFlinkMiniCluster cluster;
+ private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
- private static int port = -1;
+ @ClassRule
+ public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ CLUSTER_CONFIGURATION,
+ NUM_TASK_MANAGERS,
+ NUM_SLOTS),
+ true
+ );
- @BeforeClass
- public static void initialize() throws Exception {
+ private static Configuration getClusterConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
- config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
- config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
- File logDir = File.createTempFile("TestBaseUtils-logdir", null);
- assertTrue("Unable to delete temp file", logDir.delete());
- assertTrue("Unable to create temp directory", logDir.mkdir());
- File logFile = new File(logDir, "jobmanager.log");
- File outFile = new File(logDir, "jobmanager.out");
+ try {
+ File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+ assertTrue("Unable to delete temp file", logDir.delete());
+ assertTrue("Unable to create temp directory", logDir.mkdir());
+ File logFile = new File(logDir, "jobmanager.log");
+ File outFile = new File(logDir, "jobmanager.out");
- Files.createFile(logFile.toPath());
- Files.createFile(outFile.toPath());
+ Files.createFile(logFile.toPath());
+ Files.createFile(outFile.toPath());
- config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
- config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+ config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
+ config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+ } catch (Exception e) {
+ throw new AssertionError("Could not setup test.", e);
+ }
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+ config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
- cluster = new LocalFlinkMiniCluster(config, false);
- cluster.start();
+ return config;
+ }
- port = cluster.webMonitor().get().getServerPort();
+ @After
+ public void tearDown() {
+ BlockingInvokable.reset();
}
@Test
public void getFrontPage() {
try {
- String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+ String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html");
String text = "Apache Flink Dashboard";
assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
} catch (Exception e) {
@@ -111,7 +130,7 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testResponseHeaders() throws Exception {
// check headers for successful json response
- URL taskManagersUrl = new URL("http://localhost:" + port + "/taskmanagers");
+ URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers");
HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection();
taskManagerConnection.setConnectTimeout(100000);
taskManagerConnection.connect();
@@ -127,14 +146,18 @@ public class WebFrontendITCase extends TestLogger {
Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType());
// check headers in case of an error
- URL notFoundJobUrl = new URL("http://localhost:" + port + "/jobs/dontexist");
+ URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist");
HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection();
notFoundJobConnection.setConnectTimeout(100000);
notFoundJobConnection.connect();
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
- Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
+ } else {
+ Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+ }
} else {
throw new RuntimeException("Request for non-existing job did not return an error.");
}
@@ -143,14 +166,14 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getNumberOfTaskManagers() {
try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode response = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -159,14 +182,14 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getTaskmanagers() throws Exception {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
assertNotNull(taskManagers);
- assertEquals(cluster.numTaskManagers(), taskManagers.size());
+ assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
JsonNode taskManager = taskManagers.get(0);
assertNotNull(taskManager);
@@ -176,21 +199,21 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getLogAndStdoutFiles() throws Exception {
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log");
assertTrue(logs.contains("job manager log"));
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout");
assertTrue(logs.contains("job manager out"));
}
@Test
public void getTaskManagerLogAndStdoutFiles() {
try {
- String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+ String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
@@ -198,15 +221,15 @@ public class WebFrontendITCase extends TestLogger {
JsonNode taskManager = taskManagers.get(0);
String id = taskManager.get("id").asText();
- WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+ WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
//we check for job manager log files, since no separate taskmanager logs exist
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
- String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+ String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log");
assertTrue(logs.contains("job manager log"));
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
- logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+ logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout");
assertTrue(logs.contains("job manager out"));
} catch (Exception e) {
e.printStackTrace();
@@ -217,12 +240,12 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void getConfiguration() {
try {
- String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+ String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config");
Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
assertEquals(
- cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
- conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+ CLUSTER_CONFIGURATION.getString(ConfigConstants.LOCAL_START_WEBSERVER, null),
+ conf.get(ConfigConstants.LOCAL_START_WEBSERVER));
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -232,29 +255,42 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testStop() throws Exception {
// this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+ assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
+ sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+ clusterClient.setDetached(true);
+ clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
+ // wait for tasks to be properly running
+ BlockingInvokable.latch.await();
+
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
- // Request the file from the web server
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ // stop the job
+ client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+ HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.getType());
+ assertEquals("{}", response.getContent());
+ } else {
+ // stop the job
client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
@@ -262,12 +298,15 @@ public class WebFrontendITCase extends TestLogger {
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
+ }
+ // wait for cancellation to finish
+ while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}
// ensure we can access job details when its finished (FLINK-4011)
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
client.sendGetRequest("/jobs/" + jid + "/config", timeout);
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
@@ -283,40 +322,89 @@ public class WebFrontendITCase extends TestLogger {
@Test
public void testStopYarn() throws Exception {
// this only works if there is no active job at this point
- assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+ assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
- sender.setInvokableClass(StoppableInvokable.class);
+ sender.setInvokableClass(BlockingInvokable.class);
final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();
- cluster.submitJobDetached(jobGraph);
+ ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+ clusterClient.setDetached(true);
+ clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
// wait for job to show up
- while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+ while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}
+ // wait for tasks to be properly running
+ BlockingInvokable.latch.await();
+
final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
final Deadline deadline = testTimeout.fromNow();
- while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
- try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+ while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+ try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
// Request the file from the web server
client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
- assertEquals(HttpResponseStatus.OK, response.getStatus());
+ if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ } else {
+ assertEquals(HttpResponseStatus.OK, response.getStatus());
+ }
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
Thread.sleep(20);
}
+ BlockingInvokable.reset();
+ }
+
+ private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+ return statusMessages.stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(JobStatusMessage::getJobId)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Test invokable that is stoppable and allows waiting for all subtasks to be running.
+ */
+ public static class BlockingInvokable extends AbstractInvokable implements StoppableTask {
+
+ private static CountDownLatch latch = new CountDownLatch(2);
+
+ private volatile boolean isRunning = true;
+
+ public BlockingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ latch.countDown();
+ while (isRunning) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.isRunning = false;
+ }
+
+ public static void reset() {
+ latch = new CountDownLatch(2);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d9608fe..d94f7a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -186,6 +186,25 @@ public class HttpTestClient implements AutoCloseable {
}
/**
+ * Sends a simple PATCH request to the given path. You only specify the $path part of
+ * http://$host:$host/$path.
+ *
+ * @param path The $path to PATCH (http://$host:$host/$path)
+ */
+ public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
+ if (!path.startsWith("/")) {
+ path = "/" + path;
+ }
+
+ HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+ HttpMethod.PATCH, path);
+ getRequest.headers().set(HttpHeaders.Names.HOST, host);
+ getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+ sendRequest(getRequest, timeout);
+ }
+
+ /**
* Returns the next available HTTP response. A call to this method blocks until a response
* becomes available.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 160c1d1..8c21b37 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -55,9 +55,9 @@ public class MiniClusterResource extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
- private static final String CODEBASE_KEY = "codebase";
+ public static final String CODEBASE_KEY = "codebase";
- private static final String NEW_CODEBASE = "new";
+ public static final String NEW_CODEBASE = "new";
private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;