You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 08:27:10 UTC

[3/6] flink git commit: [FLINK-9499][rest] Support JSON request in JarHandlers

[FLINK-9499][rest] Support JSON request in JarHandlers

This closes #6330.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fe5545f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fe5545f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fe5545f

Branch: refs/heads/release-1.5
Commit: 5fe5545fd45a6e8d4eab262450f24bb5199c8f69
Parents: 37caee9
Author: zentol <ch...@apache.org>
Authored: Fri Jul 13 14:00:10 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 10:27:02 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   |  26 +-
 flink-runtime-web/pom.xml                       |  41 +++
 .../webmonitor/handlers/JarRunHandler.java      |  75 ++++-
 .../webmonitor/handlers/JarRunHeaders.java      |   7 +-
 .../webmonitor/handlers/JarRunRequestBody.java  | 109 +++++++
 .../handlers/JarRunHandlerParameterTest.java    | 314 +++++++++++++++++++
 .../webmonitor/handlers/JarRunHandlerTest.java  |   3 +-
 .../handlers/JarRunRequestBodyTest.java         |  56 ++++
 .../handlers/JarSubmissionITCase.java           |   4 +-
 .../webmonitor/testutils/ParameterProgram.java  |  36 +++
 .../scripts/modules/submit/submit.ctrl.coffee   |   9 +-
 .../scripts/modules/submit/submit.svc.coffee    |   4 +-
 .../web-dashboard/web/js/hs/index.js            |   4 +-
 flink-runtime-web/web-dashboard/web/js/index.js |   2 +-
 14 files changed, 662 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 034a3d3..b9692d0 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -360,11 +360,31 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1936993190">Request</button>
-        <div id="1936993190" class="collapse">
+        <button data-toggle="collapse" data-target="#315035146">Request</button>
+        <div id="315035146" class="collapse">
           <pre>
             <code>
-{}            </code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
+  "properties" : {
+    "entryClass" : {
+      "type" : "string"
+    },
+    "programArgs" : {
+      "type" : "string"
+    },
+    "parallelism" : {
+      "type" : "integer"
+    },
+    "allowNonRestoredState" : {
+      "type" : "boolean"
+    },
+    "savepointPath" : {
+      "type" : "string"
+    }
+  }
+}            </code>
           </pre>
          </div>
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 149ec68..5890c64 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -34,6 +34,11 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<properties>
+		<test.parameterProgram.name>parameter-program</test.parameterProgram.name>
+		<test.ParameterProgramNoManifest.name>parameter-program-without-manifest</test.ParameterProgramNoManifest.name>
+	</properties>
+
 	<dependencies>
 
 		<!-- ===================================================
@@ -136,6 +141,7 @@ under the License.
 						</goals>
 					</execution>
 					<execution>
+						<!-- Used for JarHandler tests -->
 						<id>test-program-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>
@@ -153,6 +159,39 @@ under the License.
 							<finalName>test-program</finalName>
 						</configuration>
 					</execution>
+					<execution>
+						<!-- Used for JarHandler tests -->
+						<id>test-parameter-program-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+							</includes>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.runtime.webmonitor.testutils.ParameterProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>${test.parameterProgram.name}</finalName>
+						</configuration>
+					</execution>
+					<execution>
+						<!-- Used for JarHandler tests -->
+						<id>test-parameter-program-jar-without-manifest</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>org/apache/flink/runtime/webmonitor/handlers/utils/TestProgram.java</include>
+							</includes>
+							<finalName>${test.ParameterProgramNoManifest.name}</finalName>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 
@@ -162,6 +201,8 @@ under the License.
 				<configuration>
 					<systemPropertyVariables>
 						<targetDir>${project.build.directory}</targetDir>
+						<parameterJarName>${test.parameterProgram.name}</parameterJarName>
+						<parameterJarWithoutManifestName>${test.ParameterProgramNoManifest.name}</parameterJarWithoutManifestName>
 					</systemPropertyVariables>
 				</configuration>
 			</plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index b3e1ff8..0a2e65f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -33,13 +33,15 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.slf4j.Logger;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -62,7 +64,7 @@ import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
  * Handler to submit jobs uploaded via the Web UI.
  */
 public class JarRunHandler extends
-		AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+		AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
 
 	private final Path jarDir;
 
@@ -75,7 +77,7 @@ public class JarRunHandler extends
 			final GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> responseHeaders,
-			final MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+			final MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
 			final Path jarDir,
 			final Configuration configuration,
 			final Executor executor) {
@@ -88,15 +90,33 @@ public class JarRunHandler extends
 
 	@Override
 	protected CompletableFuture<JarRunResponseBody> handleRequest(
-			@Nonnull final HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request,
+			@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
 			@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
 
+		final JarRunRequestBody requestBody = request.getRequestBody();
+
 		final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
 		final Path jarFile = jarDir.resolve(pathParameter);
 
-		final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
-		final List<String> programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
-		final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT);
+		final String entryClass = fromRequestBodyOrQueryParameter(
+			emptyToNull(requestBody.getEntryClassName()),
+			() -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
+			null,
+			log);
+
+		final List<String> programArgs = tokenizeArguments(
+			fromRequestBodyOrQueryParameter(
+				emptyToNull(requestBody.getProgramArguments()),
+				() -> getQueryParameter(request, ProgramArgsQueryParameter.class),
+				null,
+				log));
+
+		final int parallelism = fromRequestBodyOrQueryParameter(
+			requestBody.getParallelism(),
+			() -> getQueryParameter(request, ParallelismQueryParameter.class),
+			ExecutionConfig.PARALLELISM_DEFAULT,
+			log);
+
 		final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);
 
 		final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
@@ -140,12 +160,22 @@ public class JarRunHandler extends
 			});
 	}
 
-	private static SavepointRestoreSettings getSavepointRestoreSettings(
-			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
+	private SavepointRestoreSettings getSavepointRestoreSettings(
+			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
 				throws RestHandlerException {
 
-		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
-		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
+		final JarRunRequestBody requestBody = request.getRequestBody();
+
+		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
+			requestBody.getAllowNonRestoredState(),
+			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
+			false,
+			log);
+		final String savepointPath = fromRequestBodyOrQueryParameter(
+			emptyToNull(requestBody.getSavepointPath()),
+			() -> emptyToNull(getQueryParameter(request, SavepointPathQueryParameter.class)),
+			null,
+			log);
 		final SavepointRestoreSettings savepointRestoreSettings;
 		if (savepointPath != null) {
 			savepointRestoreSettings = SavepointRestoreSettings.forPath(
@@ -157,6 +187,29 @@ public class JarRunHandler extends
 		return savepointRestoreSettings;
 	}
 
+	/**
+	 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
+	 * if it is not null, otherwise returns the default value.
+	 */
+	private static <T> T fromRequestBodyOrQueryParameter(
+			T requestValue,
+			SupplierWithException<T, RestHandlerException> queryParameterExtractor,
+			T defaultValue,
+			Logger log) throws RestHandlerException {
+		if (requestValue != null) {
+			return requestValue;
+		} else {
+			T queryParameterValue = queryParameterExtractor.get();
+			if (queryParameterValue != null) {
+				log.warn("Configuring the job submission via query parameters is deprecated." +
+					" Please migrate to submitting a JSON request instead.");
+				return queryParameterValue;
+			} else {
+				return defaultValue;
+			}
+		}
+	}
+
 	private CompletableFuture<JobGraph> getJobGraphAsync(
 			final Path jarFile,
 			@Nullable final String entryClass,

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index a1ad955..fa062e9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -27,7 +26,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /**
  * {@link MessageHeaders} for {@link JarRunHandler}.
  */
-public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunResponseBody, JarRunMessageParameters> {
+public class JarRunHeaders implements MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
 
 	private static final JarRunHeaders INSTANCE = new JarRunHeaders();
 
@@ -44,8 +43,8 @@ public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, JarRunRes
 	}
 
 	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
+	public Class<JarRunRequestBody> getRequestClass() {
+		return JarRunRequestBody.class;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
new file mode 100644
index 0000000..b30ae61
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link RequestBody} for running a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarRunRequestBody implements RequestBody {
+
+	private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+	private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
+	private static final String FIELD_NAME_PARALLELISM = "parallelism";
+	private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
+	private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+
+	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
+	@Nullable
+	private String entryClassName;
+
+	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+	@Nullable
+	private String programArguments;
+
+	@JsonProperty(FIELD_NAME_PARALLELISM)
+	@Nullable
+	private Integer parallelism;
+
+	@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+	@Nullable
+	private Boolean allowNonRestoredState;
+
+	@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+	@Nullable
+	private String savepointPath;
+
+	public JarRunRequestBody() {
+		this(null, null, null, null, null);
+	}
+
+	@JsonCreator
+	public JarRunRequestBody(
+			@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
+			@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
+			@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
+			@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
+			@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
+		this.entryClassName = entryClassName;
+		this.programArguments = programArguments;
+		this.parallelism = parallelism;
+		this.allowNonRestoredState = allowNonRestoredState;
+		this.savepointPath = savepointPath;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public String getEntryClassName() {
+		return entryClassName;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public String getProgramArguments() {
+		return programArguments;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public Integer getParallelism() {
+		return parallelism;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public Boolean getAllowNonRestoredState() {
+		return allowNonRestoredState;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public String getSavepointPath() {
+		return savepointPath;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
new file mode 100644
index 0000000..8bb358a
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the parameter handling of the {@link JarRunHandler}.
+ */
+public class JarRunHandlerParameterTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	@ClassRule
+	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
+
+	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
+	private static JarRunHandler handler;
+	private static Path jarWithManifest;
+	private static Path jarWithoutManifest;
+	private static TestingDispatcherGateway restfulGateway;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		Path jarDir = TMP.newFolder().toPath();
+
+		// properties are set property by surefire plugin
+		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
+		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
+		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+
+		jarWithManifest = Files.copy(
+			jarLocation.resolve(parameterProgramJarName),
+			jarDir.resolve("program-with-manifest.jar"));
+		jarWithoutManifest = Files.copy(
+			jarLocation.resolve(parameterProgramWithoutManifestJarName),
+			jarDir.resolve("program-without-manifest.jar"));
+
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			TMP.newFolder().getAbsolutePath());
+
+		restfulGateway = new TestingDispatcherGateway.Builder()
+			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
+			.setSubmitFunction(jobGraph -> {
+				lastSubmittedJobGraphReference.set(jobGraph);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.build();
+		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+		final Time timeout = Time.seconds(10);
+		final Map<String, String> responseHeaders = Collections.emptyMap();
+		final Executor executor = TestingUtils.defaultExecutor();
+
+		handler = new JarRunHandler(
+			localAddressFuture,
+			gatewayRetriever,
+			timeout,
+			responseHeaders,
+			JarRunHeaders.getInstance(),
+			jarDir,
+			new Configuration(),
+			executor);
+	}
+
+	@Before
+	public void reset() {
+		ParameterProgram.actualArguments = null;
+	}
+
+	@Test
+	public void testDefaultParameters() throws Exception {
+		// baseline, ensure that reasonable defaults are chosen
+		sendRequestAndValidateGraph(
+			handler,
+			restfulGateway,
+			() -> createRequest(
+				new JarRunRequestBody(),
+				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+				jarWithManifest
+			),
+			jobGraph -> {
+				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
+
+				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
+
+				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
+				Assert.assertNull(savepointRestoreSettings.getRestorePath());
+			}
+		);
+	}
+
+	@Test
+	public void testConfigurationViaQueryParameters() throws Exception {
+		// configure submission via query parameters
+		sendRequestAndValidateGraph(
+			handler,
+			restfulGateway,
+			() -> {
+				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
+				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
+				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
+				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
+
+				return createRequest(
+					new JarRunRequestBody(),
+					parameters,
+					jarWithoutManifest
+				);
+			},
+			jobGraph -> {
+				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+			}
+		);
+	}
+
+	@Test
+	public void testConfigurationViaJsonRequest() throws Exception {
+		sendRequestAndValidateGraph(
+			handler,
+			restfulGateway,
+			() -> {
+				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
+					ParameterProgram.class.getCanonicalName(),
+					"--host localhost --port 1234",
+					4,
+					true,
+					"/foo/bar"
+				);
+
+				return createRequest(
+					jsonRequest,
+					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+					jarWithoutManifest
+				);
+			},
+			jobGraph -> {
+				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+			}
+		);
+	}
+
+	@Test
+	public void testParameterPrioritization() throws Exception {
+		// configure submission via query parameters and JSON request, JSON should be prioritized
+		sendRequestAndValidateGraph(
+			handler,
+			restfulGateway,
+			() -> {
+				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
+					ParameterProgram.class.getCanonicalName(),
+					"--host localhost --port 1234",
+					4,
+					true,
+					"/foo/bar"
+				);
+
+				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
+				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
+				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
+				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
+
+				return createRequest(
+					jsonRequest,
+					parameters,
+					jarWithoutManifest
+				);
+			},
+			jobGraph -> {
+				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
+				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
+				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
+				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
+				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
+
+				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
+
+				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
+			}
+		);
+	}
+
+	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
+			JarRunRequestBody requestBody,
+			JarRunMessageParameters parameters,
+			Path jar) throws HandlerRequestException {
+
+		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
+			.filter(MessageParameter::isResolved)
+			.collect(Collectors.toMap(
+				MessageParameter::getKey,
+				JarRunHandlerParameterTest::getValuesAsString
+			));
+
+		return new HandlerRequest<>(
+			requestBody,
+			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
+			Collections.singletonMap(JarIdPathParameter.KEY, jar.getFileName().toString()),
+			queryParameterAsMap,
+			Collections.emptyList()
+		);
+	}
+
+	private static void sendRequestAndValidateGraph(
+			JarRunHandler handler,
+			DispatcherGateway dispatcherGateway,
+			SupplierWithException<HandlerRequest<JarRunRequestBody, JarRunMessageParameters>, HandlerRequestException> requestSupplier,
+			ThrowingConsumer<JobGraph, AssertionError> validator) throws Exception {
+
+		handler.handleRequest(requestSupplier.get(), dispatcherGateway)
+			.get();
+
+		JobGraph submittedJobGraph = lastSubmittedJobGraphReference.getAndSet(null);
+
+		validator.accept(submittedJobGraph);
+	}
+
+	private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
+		ExecutionConfig executionConfig;
+		try {
+			executionConfig = jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
+		} catch (Exception e) {
+			throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
+		}
+		return executionConfig;
+	}
+
+	private static <X> List<String> getValuesAsString(MessageQueryParameter<X> parameter) {
+		final List<X> values = parameter.getValue();
+		return values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index aefe4f1..e3206c9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
@@ -84,7 +83,7 @@ public class JarRunHandlerTest {
 				int port = clientConfig.getInteger(RestOptions.PORT);
 
 				try {
-					client.sendRequest(host, port, headers, parameters, EmptyRequestBody.getInstance())
+					client.sendRequest(host, port, headers, parameters, new JarRunRequestBody())
 						.get();
 				} catch (Exception e) {
 					Optional<RestClientException> expected = ExceptionUtils.findThrowable(e, RestClientException.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
new file mode 100644
index 0000000..0706873
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JarRunRequestBody}.
+ */
+public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRunRequestBody> {
+
+	@Override
+	protected Class<JarRunRequestBody> getTestRequestClass() {
+		return JarRunRequestBody.class;
+	}
+
+	@Override
+	protected JarRunRequestBody getTestRequestInstance() throws Exception {
+		return new JarRunRequestBody(
+			"hello",
+			"world",
+			4,
+			true,
+			"foo/bar"
+		);
+	}
+
+	@Override
+	protected void assertOriginalEqualsToUnmarshalled(
+			final JarRunRequestBody expected,
+			final JarRunRequestBody actual) {
+		assertEquals(expected.getEntryClassName(), actual.getEntryClassName());
+		assertEquals(expected.getProgramArguments(), actual.getProgramArguments());
+		assertEquals(expected.getParallelism(), actual.getParallelism());
+		assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
+		assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
index e47a38a..e64c708 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -139,8 +139,8 @@ public class JarSubmissionITCase extends TestLogger {
 
 	private static JarRunResponseBody runJar(JarRunHandler handler, String jarName, DispatcherGateway restfulGateway) throws Exception {
 		final JarRunMessageParameters runParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-		HandlerRequest<EmptyRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
-			EmptyRequestBody.getInstance(),
+		HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest = new HandlerRequest<>(
+			new JarRunRequestBody(),
 			runParameters,
 			Collections.singletonMap(runParameters.jarIdPathParameter.getKey(), jarName),
 			Collections.emptyMap(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
new file mode 100644
index 0000000..c96cce8
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/ParameterProgram.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.testutils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Simple test program that exposes passed arguments.
+ */
+public class ParameterProgram {
+
+	public static volatile String[] actualArguments = null;
+
+	public static void main(String[] args) throws Exception {
+		actualArguments = args;
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements("hello", "world").print();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index baf8396..4f9e6d4 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -118,25 +118,32 @@ angular.module('flinkApp')
       $scope.state['plan-button'] = "Show Plan"
       $scope.error = null
 
+      request = {}
+      # legacy compatibility
       queryParameters = {}
 
       if $scope.state['entry-class']
+        request['entryClass'] = $scope.state['entry-class']
         queryParameters['entry-class'] = $scope.state['entry-class']
 
       if $scope.state.parallelism
+        request['parallelism'] = $scope.state['parallelism']
         queryParameters['parallelism'] = $scope.state['parallelism']
 
       if $scope.state['program-args']
+        request['programArgs'] = $scope.state['program-args']
         queryParameters['program-args'] = $scope.state['program-args']
 
       if $scope.state['savepointPath']
+        request['savepointPath'] = $scope.state['savepointPath']
         queryParameters['savepointPath'] = $scope.state['savepointPath']
 
       if $scope.state['allowNonRestoredState']
+        request['allowNonRestoredState'] = $scope.state['allowNonRestoredState']
         queryParameters['allowNonRestoredState'] = $scope.state['allowNonRestoredState']
 
       JobSubmitService.runJob(
-        $scope.state.selected, queryParameters
+        $scope.state.selected, request, queryParameters
       ).then (data) ->
         if action == $scope.state['action-time']
           $scope.state['submit-button'] = "Submit"

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe5545f/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
index 989bdba..98cf51e 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee
@@ -52,10 +52,10 @@ angular.module('flinkApp')
 
     deferred.promise
 
-  @runJob = (id, args) ->
+  @runJob = (id, request, queryParameters) ->
     deferred = $q.defer()
 
-    $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", {}, {params: args})
+    $http.post(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/run", request, {params: queryParameters})
     .success (data, status, headers, config) ->
       deferred.resolve(data)
     .error (err) ->