You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 08:27:12 UTC
[5/6] flink git commit: [FLINK-9811][test] Add test for jar handler
interactions
[FLINK-9811][test] Add test for jar handler interactions
This closes #6311.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37caee97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37caee97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37caee97
Branch: refs/heads/release-1.5
Commit: 37caee97acf2167702b356aff4d271bf861029d3
Parents: 67cd33e
Author: zentol <ch...@apache.org>
Authored: Wed Jul 11 18:41:06 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 10:27:02 2018 +0200
----------------------------------------------------------------------
flink-runtime-web/pom.xml | 28 +++
.../handlers/JarDeleteMessageParameters.java | 2 +-
.../webmonitor/handlers/JarListInfo.java | 6 +-
.../handlers/JarPlanMessageParameters.java | 2 +-
.../handlers/JarSubmissionITCase.java | 226 +++++++++++++++++++
.../webmonitor/handlers/utils/TestProgram.java | 31 +++
.../runtime/rest/messages/JobPlanInfo.java | 6 +
.../flink/runtime/util/BlobServerResource.java | 65 ++++++
8 files changed, 361 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index fc45ddb..149ec68 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -135,8 +135,36 @@ under the License.
<goal>test-jar</goal>
</goals>
</execution>
+ <execution>
+ <id>test-program-jar</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+ </includes>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.TestProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>test-program</finalName>
+ </configuration>
+ </execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <targetDir>${project.build.directory}</targetDir>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
index 9080409..2b70602 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
@@ -30,7 +30,7 @@ import java.util.Collections;
*/
public class JarDeleteMessageParameters extends MessageParameters {
- private JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+ public JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
index 9168686..4d8d4c9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
@@ -41,7 +41,7 @@ public class JarListInfo implements ResponseBody {
private String address;
@JsonProperty(JAR_LIST_FIELD_FILES)
- private List<JarFileInfo> jarFileList;
+ public List<JarFileInfo> jarFileList;
@JsonCreator
public JarListInfo(
@@ -85,10 +85,10 @@ public class JarListInfo implements ResponseBody {
public static final String JAR_FILE_FIELD_ENTRY = "entry";
@JsonProperty(JAR_FILE_FIELD_ID)
- private String id;
+ public String id;
@JsonProperty(JAR_FILE_FIELD_NAME)
- private String name;
+ public String name;
@JsonProperty(JAR_FILE_FIELD_UPLOADED)
private long uploaded;
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
index 7dd9950..8599a2c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
@@ -31,7 +31,7 @@ import java.util.Collections;
*/
public class JarPlanMessageParameters extends MessageParameters {
- private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+ public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
new file mode 100644
index 0000000..e47a38a
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.hamcrest.Matchers.containsString;
+
+/**
+ * Tests the entire lifecycle of a jar submission.
+ */
+public class JarSubmissionITCase extends TestLogger {
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public final BlobServerResource blobServerResource = new BlobServerResource();
+
+ @BeforeClass
+ public static void checkOS() {
+ Assume.assumeFalse("This test fails on Windows due to unclosed JarFiles, see FLINK-9844.", OperatingSystem.isWindows());
+ }
+
+ @Test
+ public void testJarSubmission() throws Exception {
+ final TestingDispatcherGateway restfulGateway = new TestingDispatcherGateway.Builder()
+ .setBlobServerPort(blobServerResource.getBlobServerPort())
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .build();
+ final JarHandlers handlers = new JarHandlers(temporaryFolder.newFolder().toPath(), restfulGateway);
+ final JarUploadHandler uploadHandler = handlers.uploadHandler;
+ final JarListHandler listHandler = handlers.listHandler;
+ final JarPlanHandler planHandler = handlers.planHandler;
+ final JarRunHandler runHandler = handlers.runHandler;
+ final JarDeleteHandler deleteHandler = handlers.deleteHandler;
+
+ // targetDir property is set via surefire configuration
+ final Path originalJar = Paths.get(System.getProperty("targetDir")).resolve("test-program.jar");
+ final Path jar = Files.copy(originalJar, temporaryFolder.getRoot().toPath().resolve("test-program.jar"));
+
+ final String storedJarPath = uploadJar(uploadHandler, jar, restfulGateway);
+ final String storedJarName = Paths.get(storedJarPath).getFileName().toString();
+
+ final JarListInfo postUploadListResponse = listJars(listHandler, restfulGateway);
+ Assert.assertEquals(1, postUploadListResponse.jarFileList.size());
+ final JarListInfo.JarFileInfo listEntry = postUploadListResponse.jarFileList.iterator().next();
+ Assert.assertEquals(jar.getFileName().toString(), listEntry.name);
+ Assert.assertEquals(storedJarName, listEntry.id);
+
+ final JobPlanInfo planResponse = showPlan(planHandler, storedJarName, restfulGateway);
+ // we're only interested in the core functionality so checking for a small detail is sufficient
+ Assert.assertThat(planResponse.getJsonPlan(), containsString("TestProgram.java:29"));
+
+ runJar(runHandler, storedJarName, restfulGateway);
+
+ deleteJar(deleteHandler, storedJarName, restfulGateway);
+
+ final JarListInfo postDeleteListResponse = listJars(listHandler, restfulGateway);
+ Assert.assertEquals(0, postDeleteListResponse.jarFileList.size());
+ }
+
+ private static String uploadJar(JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) throws Exception {
+ HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.singletonList(jar.toFile()));
+ final JarUploadResponseBody uploadResponse = handler.handleRequest(uploadRequest, restfulGateway)
+ .get();
+ return uploadResponse.getFilename();
+ }
+
+ private static JarListInfo listJars(JarListHandler handler, RestfulGateway restfulGateway) throws Exception {
+ HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ EmptyMessageParameters.getInstance());
+ return handler.handleRequest(listRequest, restfulGateway)
+ .get();
+ }
+
+ private static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception {
+ JarPlanMessageParameters planParameters = JarPlanHeaders.getInstance().getUnresolvedMessageParameters();
+ HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ planParameters,
+ Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName),
+ Collections.emptyMap(),
+ Collections.emptyList());
+ return handler.handleRequest(planRequest, restfulGateway)
+ .get();
+ }
+
+ private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception {
+ final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+ HandlerRequest<EmptyRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ runParameters,
+ Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName),
+ Collections.emptyMap(),
+ Collections.emptyList());
+ return handler.handleRequest(runRequest, restfulGateway)
+ .get();
+ }
+
+ private static void deleteJar(JarDeleteHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception {
+ JarDeleteMessageParameters deleteParameters = JarDeleteHeaders.getInstance().getUnresolvedMessageParameters();
+ HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> deleteRequest = new HandlerRequest<>(
+ EmptyRequestBody.getInstance(),
+ deleteParameters,
+ Collections.singletonMap(deleteParameters.jarIdPathParameter.getKey(), jarName),
+ Collections.emptyMap(),
+ Collections.emptyList());
+ handler.handleRequest(deleteRequest, restfulGateway)
+ .get();
+ }
+
+ private static class JarHandlers {
+ final JarUploadHandler uploadHandler;
+ final JarListHandler listHandler;
+ final JarPlanHandler planHandler;
+ final JarRunHandler runHandler;
+ final JarDeleteHandler deleteHandler;
+
+ JarHandlers(final Path jarDir, final TestingDispatcherGateway restfulGateway) {
+ final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+ final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+ final Time timeout = Time.seconds(10);
+ final Map<String, String> responseHeaders = Collections.emptyMap();
+ final Executor executor = TestingUtils.defaultExecutor();
+
+ uploadHandler = new JarUploadHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarUploadHeaders.getInstance(),
+ jarDir,
+ executor);
+
+ listHandler = new JarListHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarListHeaders.getInstance(),
+ jarDir.toFile(),
+ executor);
+
+ planHandler = new JarPlanHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarPlanHeaders.getInstance(),
+ jarDir,
+ new Configuration(),
+ executor);
+
+ runHandler = new JarRunHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarRunHeaders.getInstance(),
+ jarDir,
+ new Configuration(),
+ executor);
+
+ deleteHandler = new JarDeleteHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarDeleteHeaders.getInstance(),
+ jarDir,
+ executor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
new file mode 100644
index 0000000..19d4678
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Simple test program.
+ */
+public class TestProgram {
+ public static void main(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements("hello", "world").print();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
index 7263b36..965702d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -55,6 +56,11 @@ public class JobPlanInfo implements ResponseBody {
this.jsonPlan = jsonPlan;
}
+ @JsonIgnore
+ public String getJsonPlan() {
+ return jsonPlan.json;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
new file mode 100644
index 0000000..080ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A simple {@link ExternalResource} to be used by tests that require a {@link BlobServer}.
+ */
+public class BlobServerResource extends ExternalResource {
+ private static final Logger LOG = LoggerFactory.getLogger(BlobServerResource.class);
+ private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private BlobServer blobServer;
+
+ protected void before() throws Throwable {
+ temporaryFolder.create();
+
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ }
+
+ protected void after() {
+ temporaryFolder.delete();
+
+ try {
+ blobServer.close();
+ } catch (IOException e) {
+ LOG.error("Exception while shutting down blob server.", e);
+ }
+ }
+
+ public int getBlobServerPort() {
+ return blobServer.getPort();
+ }
+}