You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 08:27:12 UTC

[5/6] flink git commit: [FLINK-9811][test] Add test for jar handler interactions

[FLINK-9811][test] Add test for jar handler interactions

This closes #6311.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37caee97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37caee97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37caee97

Branch: refs/heads/release-1.5
Commit: 37caee97acf2167702b356aff4d271bf861029d3
Parents: 67cd33e
Author: zentol <ch...@apache.org>
Authored: Wed Jul 11 18:41:06 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 10:27:02 2018 +0200

----------------------------------------------------------------------
 flink-runtime-web/pom.xml                       |  28 +++
 .../handlers/JarDeleteMessageParameters.java    |   2 +-
 .../webmonitor/handlers/JarListInfo.java        |   6 +-
 .../handlers/JarPlanMessageParameters.java      |   2 +-
 .../handlers/JarSubmissionITCase.java           | 226 +++++++++++++++++++
 .../webmonitor/handlers/utils/TestProgram.java  |  31 +++
 .../runtime/rest/messages/JobPlanInfo.java      |   6 +
 .../flink/runtime/util/BlobServerResource.java  |  65 ++++++
 8 files changed, 361 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index fc45ddb..149ec68 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -135,8 +135,36 @@ under the License.
 							<goal>test-jar</goal>
 						</goals>
 					</execution>
+					<execution>
+						<id>test-program-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+							</includes>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.runtime.webmonitor.handlers.utils.TestProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>test-program</finalName>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<systemPropertyVariables>
+						<targetDir>${project.build.directory}</targetDir>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
 		</plugins>
 	</build>
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
index 9080409..2b70602 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java
@@ -30,7 +30,7 @@ import java.util.Collections;
  */
 public class JarDeleteMessageParameters extends MessageParameters {
 
-	private JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+	public JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
 
 	@Override
 	public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
index 9168686..4d8d4c9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfo.java
@@ -41,7 +41,7 @@ public class JarListInfo implements ResponseBody {
 	private String address;
 
 	@JsonProperty(JAR_LIST_FIELD_FILES)
-	private List<JarFileInfo> jarFileList;
+	public List<JarFileInfo> jarFileList;
 
 	@JsonCreator
 	public JarListInfo(
@@ -85,10 +85,10 @@ public class JarListInfo implements ResponseBody {
 		public static final String JAR_FILE_FIELD_ENTRY = "entry";
 
 		@JsonProperty(JAR_FILE_FIELD_ID)
-		private String id;
+		public String id;
 
 		@JsonProperty(JAR_FILE_FIELD_NAME)
-		private String name;
+		public String name;
 
 		@JsonProperty(JAR_FILE_FIELD_UPLOADED)
 		private long uploaded;

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
index 7dd9950..8599a2c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
@@ -31,7 +31,7 @@ import java.util.Collections;
  */
 public class JarPlanMessageParameters extends MessageParameters {
 
-	private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+	public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
 
 	private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
new file mode 100644
index 0000000..e47a38a
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.hamcrest.Matchers.containsString;
+
+/**
+ * Tests the entire lifecycle of a jar submission.
+ */
+public class JarSubmissionITCase extends TestLogger {
+
+	@Rule
+	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Rule
+	public final BlobServerResource blobServerResource = new BlobServerResource();
+
+	@BeforeClass
+	public static void checkOS() {
+		Assume.assumeFalse("This test fails on Windows due to unclosed JarFiles, see FLINK-9844.", OperatingSystem.isWindows());
+	}
+
+	@Test
+	public void testJarSubmission() throws Exception {
+		final TestingDispatcherGateway restfulGateway = new TestingDispatcherGateway.Builder()
+			.setBlobServerPort(blobServerResource.getBlobServerPort())
+			.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+			.build();
+		final JarHandlers handlers = new JarHandlers(temporaryFolder.newFolder().toPath(), restfulGateway);
+		final JarUploadHandler uploadHandler = handlers.uploadHandler;
+		final JarListHandler listHandler = handlers.listHandler;
+		final JarPlanHandler planHandler = handlers.planHandler;
+		final JarRunHandler runHandler = handlers.runHandler;
+		final JarDeleteHandler deleteHandler = handlers.deleteHandler;
+
+		// targetDir property is set via surefire configuration
+		final Path originalJar = Paths.get(System.getProperty("targetDir")).resolve("test-program.jar");
+		final Path jar = Files.copy(originalJar, temporaryFolder.getRoot().toPath().resolve("test-program.jar"));
+
+		final String storedJarPath = uploadJar(uploadHandler, jar, restfulGateway);
+		final String storedJarName = Paths.get(storedJarPath).getFileName().toString();
+
+		final JarListInfo postUploadListResponse = listJars(listHandler, restfulGateway);
+		Assert.assertEquals(1, postUploadListResponse.jarFileList.size());
+		final JarListInfo.JarFileInfo listEntry = postUploadListResponse.jarFileList.iterator().next();
+		Assert.assertEquals(jar.getFileName().toString(), listEntry.name);
+		Assert.assertEquals(storedJarName, listEntry.id);
+
+		final JobPlanInfo planResponse = showPlan(planHandler, storedJarName, restfulGateway);
+		// we're only interested in the core functionality so checking for a small detail is sufficient
+		Assert.assertThat(planResponse.getJsonPlan(), containsString("TestProgram.java:29"));
+
+		runJar(runHandler, storedJarName, restfulGateway);
+
+		deleteJar(deleteHandler, storedJarName, restfulGateway);
+
+		final JarListInfo postDeleteListResponse = listJars(listHandler, restfulGateway);
+		Assert.assertEquals(0, postDeleteListResponse.jarFileList.size());
+	}
+
+	private static String uploadJar(JarUploadHandler handler, Path jar, RestfulGateway restfulGateway) throws Exception {
+		HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			EmptyMessageParameters.getInstance(),
+			Collections.emptyMap(),
+			Collections.emptyMap(),
+			Collections.singletonList(jar.toFile()));
+		final JarUploadResponseBody uploadResponse = handler.handleRequest(uploadRequest, restfulGateway)
+			.get();
+		return uploadResponse.getFilename();
+	}
+
+	private static JarListInfo listJars(JarListHandler handler, RestfulGateway restfulGateway) throws Exception {
+		HandlerRequest<EmptyRequestBody, EmptyMessageParameters> listRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			EmptyMessageParameters.getInstance());
+		return handler.handleRequest(listRequest, restfulGateway)
+			.get();
+	}
+
+	private static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception {
+		JarPlanMessageParameters planParameters = JarPlanHeaders.getInstance().getUnresolvedMessageParameters();
+		HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			planParameters,
+			Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName),
+			Collections.emptyMap(),
+			Collections.emptyList());
+		return handler.handleRequest(planRequest, restfulGateway)
+			.get();
+	}
+
+	private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception {
+		final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+		HandlerRequest<EmptyRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			runParameters,
+			Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName),
+			Collections.emptyMap(),
+			Collections.emptyList());
+		return handler.handleRequest(runRequest, restfulGateway)
+			.get();
+	}
+
+	private static void deleteJar(JarDeleteHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception {
+		JarDeleteMessageParameters deleteParameters = JarDeleteHeaders.getInstance().getUnresolvedMessageParameters();
+		HandlerRequest<EmptyRequestBody, JarDeleteMessageParameters> deleteRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			deleteParameters,
+			Collections.singletonMap(deleteParameters.jarIdPathParameter.getKey(), jarName),
+			Collections.emptyMap(),
+			Collections.emptyList());
+		handler.handleRequest(deleteRequest, restfulGateway)
+			.get();
+	}
+
+	private static class JarHandlers {
+		final JarUploadHandler uploadHandler;
+		final JarListHandler listHandler;
+		final JarPlanHandler planHandler;
+		final JarRunHandler runHandler;
+		final JarDeleteHandler deleteHandler;
+
+		JarHandlers(final Path jarDir, final TestingDispatcherGateway restfulGateway) {
+			final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+			final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+			final Time timeout = Time.seconds(10);
+			final Map<String, String> responseHeaders = Collections.emptyMap();
+			final Executor executor = TestingUtils.defaultExecutor();
+
+			uploadHandler = new JarUploadHandler(
+				localAddressFuture,
+				gatewayRetriever,
+				timeout,
+				responseHeaders,
+				JarUploadHeaders.getInstance(),
+				jarDir,
+				executor);
+
+			listHandler = new JarListHandler(
+				localAddressFuture,
+				gatewayRetriever,
+				timeout,
+				responseHeaders,
+				JarListHeaders.getInstance(),
+				jarDir.toFile(),
+				executor);
+
+			planHandler = new JarPlanHandler(
+				localAddressFuture,
+				gatewayRetriever,
+				timeout,
+				responseHeaders,
+				JarPlanHeaders.getInstance(),
+				jarDir,
+				new Configuration(),
+				executor);
+
+			runHandler = new JarRunHandler(
+				localAddressFuture,
+				gatewayRetriever,
+				timeout,
+				responseHeaders,
+				JarRunHeaders.getInstance(),
+				jarDir,
+				new Configuration(),
+				executor);
+
+			deleteHandler = new JarDeleteHandler(
+				localAddressFuture,
+				gatewayRetriever,
+				timeout,
+				responseHeaders,
+				JarDeleteHeaders.getInstance(),
+				jarDir,
+				executor);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
new file mode 100644
index 0000000..19d4678
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Simple test program.
+ */
+public class TestProgram {
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements("hello", "world").print();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
index 7263b36..965702d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -55,6 +56,11 @@ public class JobPlanInfo implements ResponseBody {
 		this.jsonPlan = jsonPlan;
 	}
 
+	@JsonIgnore
+	public String getJsonPlan() {
+		return jsonPlan.json;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/37caee97/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
new file mode 100644
index 0000000..080ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A simple {@link ExternalResource} to be used by tests that require a {@link BlobServer}.
+ */
+public class BlobServerResource extends ExternalResource {
+	private static final Logger LOG = LoggerFactory.getLogger(BlobServerResource.class);
+	private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private BlobServer blobServer;
+
+	protected void before() throws Throwable {
+		temporaryFolder.create();
+
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+		blobServer = new BlobServer(config, new VoidBlobStore());
+		blobServer.start();
+	}
+
+	protected void after() {
+		temporaryFolder.delete();
+
+		try {
+			blobServer.close();
+		} catch (IOException e) {
+			LOG.error("Exception while shutting down blob server.", e);
+		}
+	}
+
+	public int getBlobServerPort() {
+		return blobServer.getPort();
+	}
+}