You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 06:46:31 UTC
[3/3] flink git commit: [FLINK-9499][rest] Support JSON request in
JarHandlers
[FLINK-9499][rest] Support JSON request in JarHandlers
This closes #6330.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fefe866b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fefe866b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fefe866b
Branch: refs/heads/master
Commit: fefe866bad47b1c4a2f92eded19bc7a5059f1277
Parents: 230f817
Author: zentol <ch...@apache.org>
Authored: Fri Jul 13 14:00:10 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 08:46:20 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/rest_dispatcher.html | 26 +-
flink-runtime-web/pom.xml | 41 +++
.../webmonitor/handlers/JarRunHandler.java | 75 ++++-
.../webmonitor/handlers/JarRunHeaders.java | 7 +-
.../webmonitor/handlers/JarRunRequestBody.java | 109 +++++++
.../handlers/JarRunHandlerParameterTest.java | 314 +++++++++++++++++++
.../webmonitor/handlers/JarRunHandlerTest.java | 3 +-
.../handlers/JarRunRequestBodyTest.java | 56 ++++
.../handlers/JarSubmissionITCase.java | 4 +-
.../webmonitor/testutils/ParameterProgram.java | 36 +++
.../scripts/modules/submit/submit.ctrl.coffee | 9 +-
.../scripts/modules/submit/submit.svc.coffee | 4 +-
.../web-dashboard/web/js/hs/index.js | 4 +-
flink-runtime-web/web-dashboard/web/js/index.js | 2 +-
14 files changed, 662 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 6ed59be..ab0f4e0 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -360,11 +360,31 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#1936993190">Request</button>
- <div id="1936993190" class="collapse">
+ <button data-toggle="collapse" data-target="#315035146">Request</button>
+ <div id="315035146" class="collapse">
<pre>
<code>
-{} </code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
+ "properties" : {
+ "entryClass" : {
+ "type" : "string"
+ },
+ "programArgs" : {
+ "type" : "string"
+ },
+ "parallelism" : {
+ "type" : "integer"
+ },
+ "allowNonRestoredState" : {
+ "type" : "boolean"
+ },
+ "savepointPath" : {
+ "type" : "string"
+ }
+ }
+} </code>
</pre>
</div>
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 51be16f..f4c4d0a 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -34,6 +34,11 @@ under the License.
<packaging>jar</packaging>
+ <properties>
+ <test.parameterProgram.name>parameter-program</test.parameterProgram.name>
+ <test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+ </properties>
+
<dependencies>
<!-- ===================================================
@@ -136,6 +141,7 @@ under the License.
</goals>
</execution>
<execution>
+ <!-- Used for JarHandler tests -->
<id>test-program-jar</id>
<phase>process-test-classes</phase>
<goals>
@@ -153,6 +159,39 @@ under the License.
<finalName>test-program</finalName>
</configuration>
</execution>
+ <execution>
+ <!-- Used for JarHandler tests -->
+ <id>test-parameter-program-jar</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+ </includes>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.runtime.webmonitor.testutils.ParameterProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>${test.parameterProgram.name}</finalName>
+ </configuration>
+ </execution>
+ <execution>
+ <!-- Used for JarHandler tests -->
+ <id>test-parameter-program-jar-without-manifest</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+ </includes>
+ <finalName>${test.ParameterProgramNoManifest.name}</finalName>
+ </configuration>
+ </execution>
</executions>
</plugin>
@@ -162,6 +201,8 @@ under the License.
<configuration>
<systemPropertyVariables>
<targetDir>${project.build.directory}</targetDir>
+ <parameterJarName>${test.parameterProgram.name}</parameterJarName>
+ <parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
</systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 858a05c..bae4ba8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -33,13 +33,15 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.slf4j.Logger;
+
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -61,7 +63,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
* Handler to submit jobs uploaded via the Web UI.
*/
public class JarRunHandler extends
- AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+ AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
private final Path jarDir;
@@ -74,7 +76,7 @@ public class JarRunHandler extends
final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
final Time timeout,
final Map<String, String> responseHeaders,
- final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+ final MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
final Path jarDir,
final Configuration configuration,
final Executor executor) {
@@ -87,15 +89,33 @@ public class JarRunHandler extends
@Override
protected CompletableFuture<JarRunResponseBody> handleRequest(
- @Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
+ @Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
+ final JarRunRequestBody requestBody = request.getRequestBody();
+
final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
final Path jarFile = jarDir.resolve(pathParameter);
- final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
- final List<String> programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
- final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT);
+ final String entryClass = fromRequestBodyOrQueryParameter(
+ emptyToNull(requestBody.getEntryClassName()),
+ () -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
+ null,
+ log);
+
+ final List<String> programArgs = tokenizeArguments(
+ fromRequestBodyOrQueryParameter(
+ emptyToNull(requestBody.getProgramArguments()),
+ () -> getQueryParameter(request, ProgramArgsQueryParameter.class),
+ null,
+ log));
+
+ final int parallelism = fromRequestBodyOrQueryParameter(
+ requestBody.getParallelism(),
+ () -> getQueryParameter(request, ParallelismQueryParameter.class),
+ ExecutionConfig.PARALLELISM_DEFAULT,
+ log);
+
final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);
final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
@@ -134,12 +154,22 @@ public class JarRunHandler extends
});
}
- private static SavepointRestoreSettings getSavepointRestoreSettings(
- final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
+ private SavepointRestoreSettings getSavepointRestoreSettings(
+ final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
throws RestHandlerException {
- final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
- final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
+ final JarRunRequestBody requestBody = request.getRequestBody();
+
+ final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
+ requestBody.getAllowNonRestoredState(),
+ () -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
+ false,
+ log);
+ final String savepointPath = fromRequestBodyOrQueryParameter(
+ emptyToNull(requestBody.getSavepointPath()),
+ () -> emptyToNull(getQueryParameter(request, SavepointPathQueryParameter.class)),
+ null,
+ log);
final SavepointRestoreSettings savepointRestoreSettings;
if (savepointPath != null) {
savepointRestoreSettings = SavepointRestoreSettings.forPath(
@@ -151,6 +181,29 @@ public class JarRunHandler extends
return savepointRestoreSettings;
}
+ /**
+ * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
+ * if it is not null, otherwise returns the default value.
+ */
+ private static <T> T fromRequestBodyOrQueryParameter(
+ T requestValue,
+ SupplierWithException<T, RestHandlerException> queryParameterExtractor,
+ T defaultValue,
+ Logger log) throws RestHandlerException {
+ if (requestValue != null) {
+ return requestValue;
+ } else {
+ T queryParameterValue = queryParameterExtractor.get();
+ if (queryParameterValue != null) {
+ log.warn("Configuring the job submission via query parameters is deprecated." +
+ " Please migrate to submitting a JSON request instead.");
+ return queryParameterValue;
+ } else {
+ return defaultValue;
+ }
+ }
+ }
+
private CompletableFuture<JobGraph> getJobGraphAsync(
final Path jarFile,
@Nullable final String entryClass,
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index a1ad955..fa062e9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -27,7 +26,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
/**
* {@link MessageHeaders} for {@link JarRunHandler}.
*/
-public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+public class JarRunHeaders implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
private static final JarRunHeaders INSTANCE = new JarRunHeaders();
@@ -44,8 +43,8 @@ public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunRes
}
@Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
+ public Class<JarRunRequestBody> getRequestClass() {
+ return JarRunRequestBody.class;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
new file mode 100644
index 0000000..b30ae61
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link RequestBody} for running a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarRunRequestBody implements RequestBody {
+
+ private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+ private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
+ private static final String FIELD_NAME_PARALLELISM = "parallelism";
+ private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
+ private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+
+ @JsonProperty(FIELD_NAME_ENTRY_CLASS)
+ @Nullable
+ private String entryClassName;
+
+ @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+ @Nullable
+ private String programArguments;
+
+ @JsonProperty(FIELD_NAME_PARALLELISM)
+ @Nullable
+ private Integer parallelism;
+
+ @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+ @Nullable
+ private Boolean allowNonRestoredState;
+
+ @JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+ @Nullable
+ private String savepointPath;
+
+ public JarRunRequestBody() {
+ this(null, null, null, null, null);
+ }
+
+ @JsonCreator
+ public JarRunRequestBody(
+ @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
+ @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
+ @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
+ @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
+ @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
+ this.entryClassName = entryClassName;
+ this.programArguments = programArguments;
+ this.parallelism = parallelism;
+ this.allowNonRestoredState = allowNonRestoredState;
+ this.savepointPath = savepointPath;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public String getEntryClassName() {
+ return entryClassName;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public String getProgramArguments() {
+ return programArguments;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public Integer getParallelism() {
+ return parallelism;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public Boolean getAllowNonRestoredState() {
+ return allowNonRestoredState;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public String getSavepointPath() {
+ return savepointPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
new file mode 100644
index 0000000..8bb358a
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the parameter handling of the {@link JarRunHandler}.
+ */
+public class JarRunHandlerParameterTest extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ @ClassRule
+ public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
+
+ private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
+ private static JarRunHandler handler;
+ private static Path jarWithManifest;
+ private static Path jarWithoutManifest;
+ private static TestingDispatcherGateway restfulGateway;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Path jarDir = TMP.newFolder().toPath();
+
+ // properties are set property by surefire plugin
+ final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
+ final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
+ final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+
+ jarWithManifest = Files.copy(
+ jarLocation.resolve(parameterProgramJarName),
+ jarDir.resolve("program-with-manifest.jar"));
+ jarWithoutManifest = Files.copy(
+ jarLocation.resolve(parameterProgramWithoutManifestJarName),
+ jarDir.resolve("program-without-manifest.jar"));
+
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ TMP.newFolder().getAbsolutePath());
+
+ restfulGateway = new TestingDispatcherGateway.Builder()
+ .setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
+ .setSubmitFunction(jobGraph -> {
+ lastSubmittedJobGraphReference.set(jobGraph);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+ final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+ final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+ final Time timeout = Time.seconds(10);
+ final Map<String, String> responseHeaders = Collections.emptyMap();
+ final Executor executor = TestingUtils.defaultExecutor();
+
+ handler = new JarRunHandler(
+ localAddressFuture,
+ gatewayRetriever,
+ timeout,
+ responseHeaders,
+ JarRunHeaders.getInstance(),
+ jarDir,
+ new Configuration(),
+ executor);
+ }
+
+ @Before
+ public void reset() {
+ ParameterProgram.actualArguments = null;
+ }
+
+ @Test
+ public void testDefaultParameters() throws Exception {
+ // baseline, ensure that reasonable defaults are chosen
+ sendRequestAndValidateGraph(
+ handler,
+ restfulGateway,
+ () -> createRequest(
+ new JarRunRequestBody(),
+ JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+ jarWithManifest
+ ),
+ jobGraph -> {
+ Assert.assertEquals(0, ParameterProgram.actualArguments.length);
+
+ Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
+
+ final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+ Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
+ Assert.assertNull(savepointRestoreSettings.getRestorePath());
+ }
+ );
+ }
+
+ @Test
+ public void testConfigurationViaQueryParameters() throws Exception {
+ // configure submission via query parameters
+ sendRequestAndValidateGraph(
+ handler,
+ restfulGateway,
+ () -> {
+ final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+ parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
+ parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
+ parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+ parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
+ parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
+
+ return createRequest(
+ new JarRunRequestBody(),
+ parameters,
+ jarWithoutManifest
+ );
+ },
+ jobGraph -> {
+ Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+ Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+ Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+ Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+ Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+ Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+ final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+ Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+ Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+ }
+ );
+ }
+
+ @Test
+ public void testConfigurationViaJsonRequest() throws Exception {
+ sendRequestAndValidateGraph(
+ handler,
+ restfulGateway,
+ () -> {
+ final JarRunRequestBody jsonRequest = new JarRunRequestBody(
+ ParameterProgram.class.getCanonicalName(),
+ "--host localhost --port 1234",
+ 4,
+ true,
+ "/foo/bar"
+ );
+
+ return createRequest(
+ jsonRequest,
+ JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+ jarWithoutManifest
+ );
+ },
+ jobGraph -> {
+ Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+ Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+ Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+ Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+ Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+ Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+ final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+ Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+ Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+ }
+ );
+ }
+
+ @Test
+ public void testParameterPrioritization() throws Exception {
+ // configure submission via query parameters and JSON request, JSON should be prioritized
+ sendRequestAndValidateGraph(
+ handler,
+ restfulGateway,
+ () -> {
+ final JarRunRequestBody jsonRequest = new JarRunRequestBody(
+ ParameterProgram.class.getCanonicalName(),
+ "--host localhost --port 1234",
+ 4,
+ true,
+ "/foo/bar"
+ );
+
+ final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+ parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
+ parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
+ parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+ parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+ parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
+
+ return createRequest(
+ jsonRequest,
+ parameters,
+ jarWithoutManifest
+ );
+ },
+ jobGraph -> {
+ Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+ Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+ Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+ Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+ Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+ Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+ final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+ Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+ Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+ }
+ );
+ }
+
+ private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
+ JarRunRequestBody requestBody,
+ JarRunMessageParameters parameters,
+ Path jar) throws HandlerRequestException {
+
+ final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
+ .filter(MessageParameter::isResolved)
+ .collect(Collectors.toMap(
+ MessageParameter::getKey,
+ JarRunHandlerParameterTest::getValuesAsString
+ ));
+
+ return new HandlerRequest<>(
+ requestBody,
+ JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+ Collections.singletonMap(JarIdPathParameter.KEY, jar.getFileName().toString()),
+ queryParameterAsMap,
+ Collections.emptyList()
+ );
+ }
+
+ private static void sendRequestAndValidateGraph(
+ JarRunHandler handler,
+ DispatcherGateway dispatcherGateway,
+ SupplierWithException<HandlerRequest<JarRunRequestBody, JarRunMessageParameters>, HandlerRequestException> requestSupplier,
+ ThrowingConsumer<JobGraph, AssertionError> validator) throws Exception {
+
+ handler.handleRequest(requestSupplier.get(), dispatcherGateway)
+ .get();
+
+ JobGraph submittedJobGraph = lastSubmittedJobGraphReference.getAndSet(null);
+
+ validator.accept(submittedJobGraph);
+ }
+
+ private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
+ ExecutionConfig executionConfig;
+ try {
+ executionConfig = jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
+ } catch (Exception e) {
+ throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
+ }
+ return executionConfig;
+ }
+
+ private static <X> List<String> getValuesAsString(MessageQueryParameter<X> parameter) {
+ final List<X> values = parameter.getValue();
+ return values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 3e8e633..6427f4d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
@@ -85,7 +84,7 @@ public class JarRunHandlerTest {
int port = clientConfig.getInteger(RestOptions.PORT);
try {
- client.sendRequest(host, port, headers, parameters, EmptyRequestBody.getInstance())
+ client.sendRequest(host, port, headers, parameters, new JarRunRequestBody())
.get();
} catch (Exception e) {
Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
new file mode 100644
index 0000000..0706873
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JarRunRequestBody}.
+ */
+public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRunRequestBody> {
+
+ @Override
+ protected Class<JarRunRequestBody> getTestRequestClass() {
+ return JarRunRequestBody.class;
+ }
+
+ @Override
+ protected JarRunRequestBody getTestRequestInstance() throws Exception {
+ return new JarRunRequestBody(
+ "hello",
+ "world",
+ 4,
+ true,
+ "foo/bar"
+ );
+ }
+
+ @Override
+ protected void assertOriginalEqualsToUnmarshalled(
+ final JarRunRequestBody expected,
+ final JarRunRequestBody actual) {
+ assertEquals(expected.getEntryClassName(), actual.getEntryClassName());
+ assertEquals(expected.getProgramArguments(), actual.getProgramArguments());
+ assertEquals(expected.getParallelism(), actual.getParallelism());
+ assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
+ assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
index e47a38a..e64c708 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -139,8 +139,8 @@ public class JarSubmissionITCase extends TestLogger {
private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception {
final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
- HandlerRequest<EmptyRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
- EmptyRequestBody.getInstance(),
+ HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
+ new JarRunRequestBody(),
runParameters,
Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName),
Collections.emptyMap(),
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
new file mode 100644
index 0000000..c96cce8
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.testutils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Simple test program that exposes passed arguments.
+ */
+public class ParameterProgram {
+
+ public static volatile String[] actualArguments = null;
+
+ public static void main(String[] args) throws Exception {
+ actualArguments = args;
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements("hello", "world").print();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index baf8396..4f9e6d4 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -118,25 +118,32 @@ angular.module('flinkApp')
$scope.state['plan-button'] = "Show Plan"
$scope.error = null
+ request = {}
+ # legacy compatibility
queryParameters = {}
if $scope.state['entry-class']
+ request['entryClass'] = $scope.state['entry-class']
queryParameters['entry-class'] = $scope.state['entry-class']
if $scope.state.parallelism
+ request['parallelism'] = $scope.state['parallelism']
queryParameters['parallelism'] = $scope.state['parallelism']
if $scope.state['program-args']
+ request['programArgs'] = $scope.state['program-args']
queryParameters['program-args'] = $scope.state['program-args']
if $scope.state['savepointPath']
+ request['savepointPath'] = $scope.state['savepointPath']
queryParameters['savepointPath'] = $scope.state['savepointPath']
if $scope.state['allowNonRestoredState']
+ request['allowNonRestoredState'] = $scope.state['allowNonRestoredState']
queryParameters['allowNonRestoredState'] = $scope.state['allowNonRestoredState']
JobSubmitService.runJob(
- $scope.state.selected, queryParameters
+ $scope.state.selected, request, queryParameters
).then (data) ->
if action == $scope.state['action-time']
$scope.state['submit-button'] = "Submit"
http://git-wip-us.apache.org/repos/asf/flink/blob/fefe866b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
index 989bdba..98cf51e 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
@@ -52,10 +52,10 @@ angular.module('flinkApp')
deferred.promise
- @runJob = (id, args) ->
+ @runJob = (id, request, queryParameters) ->
deferred = $q.defer()
- $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
+ $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", request, {params: queryParameters})
.success (data, status, headers, config) ->
deferred.resolve(data)
.error (err) ->