You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/09 14:40:26 UTC

[GitHub] zentol closed pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

zentol closed pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_v1_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
index 6ee705eeb0a..4977e5ccfeb 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -272,7 +272,7 @@
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
-      <td colspan="2">Returns the dataflow plan of a job contained in a jar previously uploaded via '/jars/upload'.</td>
+      <td colspan="2">Returns the dataflow plan of a job contained in a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters.</td>
     </tr>
     <tr>
       <td colspan="2">Path parameters</td>
@@ -290,19 +290,40 @@
     <tr>
       <td colspan="2">
         <ul>
+<li><code>program-args</code> (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan</li>
+<li><code>programArg</code> (optional): Comma-separated list of program arguments.</li>
 <li><code>entry-class</code> (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest.</li>
 <li><code>parallelism</code> (optional): Positive integer value that specifies the desired parallelism for the job.</li>
-<li><code>program-args</code> (optional): String value that specifies the arguments for the program or plan.</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-181694384">Request</button>
-        <div id="-181694384" class="collapse">
+        <button data-toggle="collapse" data-target="#550027726">Request</button>
+        <div id="550027726" class="collapse">
           <pre>
             <code>
-{}            </code>
+{
+  "type" : "object",
+  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarPlanRequestBody",
+  "properties" : {
+    "entryClass" : {
+      "type" : "string"
+    },
+    "programArgs" : {
+      "type" : "string"
+    },
+    "programArgsList" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
+    "parallelism" : {
+      "type" : "integer"
+    }
+  }
+}            </code>
           </pre>
          </div>
       </td>
@@ -338,7 +359,7 @@
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
-      <td colspan="2">Submits a job by running a jar previously uploaded via '/jars/upload'.</td>
+      <td colspan="2">Submits a job by running a jar previously uploaded via '/jars/upload'. Program arguments can be passed both via the JSON request (recommended) or query parameters.</td>
     </tr>
     <tr>
       <td colspan="2">Path parameters</td>
@@ -356,11 +377,12 @@
     <tr>
       <td colspan="2">
         <ul>
-<li><code>program-args</code> (optional): String value that specifies the arguments for the program or plan.</li>
-<li><code>entry-class</code> (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest.</li>
-<li><code>parallelism</code> (optional): Positive integer value that specifies the desired parallelism for the job.</li>
 <li><code>allowNonRestoredState</code> (optional): Boolean value that specifies whether the job submission should be rejected if the savepoint contains state that cannot be mapped back to the job.</li>
 <li><code>savepointPath</code> (optional): String value that specifies the path of the savepoint to restore the job from.</li>
+<li><code>program-args</code> (optional): Deprecated, please use 'programArg' instead. String value that specifies the arguments for the program or plan</li>
+<li><code>programArg</code> (optional): Comma-separated list of program arguments.</li>
+<li><code>entry-class</code> (optional): String value that specifies the fully qualified name of the entry point class. Overrides the class defined in the jar file manifest.</li>
+<li><code>parallelism</code> (optional): Positive integer value that specifies the desired parallelism for the job.</li>
         </ul>
       </td>
     </tr>
@@ -380,6 +402,12 @@
     "programArgs" : {
       "type" : "string"
     },
+    "programArgsList" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
     "parallelism" : {
       "type" : "integer"
     },
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
new file mode 100644
index 00000000000..f1bf00a83f4
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}.
+ */
+abstract class JarMessageParameters extends MessageParameters {
+
+	final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+
+	final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
+
+	final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
+
+	final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
+
+	final ProgramArgQueryParameter programArgQueryParameter = new ProgramArgQueryParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.singletonList(jarIdPathParameter);
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.unmodifiableList(Arrays.asList(
+			programArgsQueryParameter,
+			programArgQueryParameter,
+			entryClassQueryParameter,
+			parallelismQueryParameter));
+	}
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 2f0631610fc..94a53643af1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,44 +18,34 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments;
-import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
 
 /**
  * This handler handles requests to fetch the plan for a jar.
  */
 public class JarPlanHandler
-		extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobPlanInfo, JarPlanMessageParameters> {
+		extends AbstractRestHandler<RestfulGateway, JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
 
 	private final Path jarDir;
 
@@ -63,47 +53,49 @@
 
 	private final Executor executor;
 
+	private final Function<JobGraph, JobPlanInfo> planGenerator;
+
 	public JarPlanHandler(
 			final CompletableFuture<String> localRestAddress,
 			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			final Time timeout,
 			final Map<String, String> responseHeaders,
-			final MessageHeaders<EmptyRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
+			final MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
 			final Path jarDir,
 			final Configuration configuration,
 			final Executor executor) {
+		this(
+			localRestAddress, leaderRetriever, timeout, responseHeaders,
+			messageHeaders, jarDir, configuration, executor,
+			jobGraph -> new JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph)));
+	}
+
+	public JarPlanHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> responseHeaders,
+			final MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
+			final Path jarDir,
+			final Configuration configuration,
+			final Executor executor,
+			final Function<JobGraph, JobPlanInfo> planGenerator) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
 		this.jarDir = requireNonNull(jarDir);
 		this.configuration = requireNonNull(configuration);
 		this.executor = requireNonNull(executor);
+		this.planGenerator = planGenerator;
 	}
 
 	@Override
 	protected CompletableFuture<JobPlanInfo> handleRequest(
-			@Nonnull final HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> request,
+			@Nonnull final HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> request,
 			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
-
-		final String jarId = request.getPathParameter(JarIdPathParameter.class);
-		final String entryClass = emptyToNull(HandlerRequestUtils.getQueryParameter(request, EntryClassQueryParameter.class));
-		final Integer parallelism = HandlerRequestUtils.getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT);
-		final List<String> programArgs = tokenizeArguments(HandlerRequestUtils.getQueryParameter(request, ProgramArgsQueryParameter.class));
-		final Path jarFile = jarDir.resolve(jarId);
+		final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
 
 		return CompletableFuture.supplyAsync(() -> {
-			final JobGraph jobGraph;
-			try {
-				final PackagedProgram packagedProgram = new PackagedProgram(
-					jarFile.toFile(),
-					entryClass,
-					programArgs.toArray(new String[programArgs.size()]));
-				jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism);
-			} catch (final ProgramInvocationException e) {
-				throw new CompletionException(new RestHandlerException(
-					e.getMessage(),
-					HttpResponseStatus.INTERNAL_SERVER_ERROR,
-					e));
-			}
-			return new JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph));
+			final JobGraph jobGraph = context.toJobGraph(configuration);
+			return planGenerator.apply(jobGraph);
 		}, executor);
 	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
index 38fc705e84b..09311d8aca0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 
@@ -28,7 +27,7 @@
 /**
  * Message headers for {@link JarPlanHandler}.
  */
-public class JarPlanHeaders implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JarPlanMessageParameters> {
+public class JarPlanHeaders implements MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
 
 	private static final JarPlanHeaders INSTANCE = new JarPlanHeaders();
 
@@ -43,8 +42,8 @@ public HttpResponseStatus getResponseStatusCode() {
 	}
 
 	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
+	public Class<JarPlanRequestBody> getRequestClass() {
+		return JarPlanRequestBody.class;
 	}
 
 	@Override
@@ -68,6 +67,7 @@ public static JarPlanHeaders getInstance() {
 
 	@Override
 	public String getDescription() {
-		return "Returns the dataflow plan of a job contained in a jar previously uploaded via '" + JarUploadHeaders.URL + "'.";
+		return "Returns the dataflow plan of a job contained in a jar previously uploaded via '" + JarUploadHeaders.URL + "'. " +
+			"Program arguments can be passed both via the JSON request (recommended) or query parameters.";
 	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
index 8599a2ccf44..e7619b36361 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
@@ -19,36 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
- * Message parameters for {@link JarPlanHandler}.
+ * {@link MessageParameters} for {@link JarPlanHandler}.
  */
-public class JarPlanMessageParameters extends MessageParameters {
-
-	public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
-
-	private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
-
-	private final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
-
-	private final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
-
-	@Override
-	public Collection<MessagePathParameter<?>> getPathParameters() {
-		return Collections.singletonList(jarIdPathParameter);
-	}
-
-	@Override
-	public Collection<MessageQueryParameter<?>> getQueryParameters() {
-		return Collections.unmodifiableCollection(Arrays.asList(
-			entryClassQueryParameter,
-			parallelismQueryParameter,
-			programArgsQueryParameter));
-	}
+class JarPlanMessageParameters extends JarMessageParameters {
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java
new file mode 100644
index 00000000000..8e209b41cc1
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * {@link RequestBody} for querying the plan from a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarPlanRequestBody extends JarRequestBody {
+	JarPlanRequestBody() {
+		super(null, null, null, null);
+	}
+
+	@JsonCreator
+	JarPlanRequestBody(
+		@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
+		@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
+		@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
+		@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) {
+		super(entryClassName, programArguments, programArgumentsList, parallelism);
+	}
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
new file mode 100644
index 00000000000..144ac05058c
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Base class for {@link RequestBody} for running a jar or querying the plan.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class JarRequestBody implements RequestBody {
+
+	static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+	static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
+	static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = "programArgsList";
+	static final String FIELD_NAME_PARALLELISM = "parallelism";
+
+	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
+	@Nullable
+	private String entryClassName;
+
+	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+	@Nullable
+	private String programArguments;
+
+	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
+	@Nullable
+	private List<String> programArgumentsList;
+
+	@JsonProperty(FIELD_NAME_PARALLELISM)
+	@Nullable
+	private Integer parallelism;
+
+	JarRequestBody() {
+		this(null, null, null, null);
+	}
+
+	@JsonCreator
+	JarRequestBody(
+		@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
+		@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
+		@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
+		@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism) {
+		this.entryClassName = entryClassName;
+		this.programArguments = programArguments;
+		this.programArgumentsList = programArgumentsList;
+		this.parallelism = parallelism;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public String getEntryClassName() {
+		return entryClassName;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public String getProgramArguments() {
+		return programArguments;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public List<String> getProgramArgumentsList() {
+		return programArgumentsList;
+	}
+
+	@Nullable
+	@JsonIgnore
+	public Integer getParallelism() {
+		return parallelism;
+	}
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index bae4ba86a0e..679dfe988ef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.client.ClientUtils;
@@ -34,29 +30,24 @@
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import org.slf4j.Logger;
-
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
 import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
-import static org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments;
 import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
 
 /**
@@ -91,39 +82,11 @@ public JarRunHandler(
 	protected CompletableFuture<JarRunResponseBody> handleRequest(
 			@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
 			@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
-
-		final JarRunRequestBody requestBody = request.getRequestBody();
-
-		final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
-		final Path jarFile = jarDir.resolve(pathParameter);
-
-		final String entryClass = fromRequestBodyOrQueryParameter(
-			emptyToNull(requestBody.getEntryClassName()),
-			() -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
-			null,
-			log);
-
-		final List<String> programArgs = tokenizeArguments(
-			fromRequestBodyOrQueryParameter(
-				emptyToNull(requestBody.getProgramArguments()),
-				() -> getQueryParameter(request, ProgramArgsQueryParameter.class),
-				null,
-				log));
-
-		final int parallelism = fromRequestBodyOrQueryParameter(
-			requestBody.getParallelism(),
-			() -> getQueryParameter(request, ParallelismQueryParameter.class),
-			ExecutionConfig.PARALLELISM_DEFAULT,
-			log);
+		final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
 
 		final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request);
 
-		final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(
-			jarFile,
-			entryClass,
-			programArgs,
-			savepointRestoreSettings,
-			parallelism);
+		final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(context, savepointRestoreSettings);
 
 		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
 
@@ -181,52 +144,11 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
 		return savepointRestoreSettings;
 	}
 
-	/**
-	 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
-	 * if it is not null, otherwise returns the default value.
-	 */
-	private static <T> T fromRequestBodyOrQueryParameter(
-			T requestValue,
-			SupplierWithException<T, RestHandlerException> queryParameterExtractor,
-			T defaultValue,
-			Logger log) throws RestHandlerException {
-		if (requestValue != null) {
-			return requestValue;
-		} else {
-			T queryParameterValue = queryParameterExtractor.get();
-			if (queryParameterValue != null) {
-				log.warn("Configuring the job submission via query parameters is deprecated." +
-					" Please migrate to submitting a JSON request instead.");
-				return queryParameterValue;
-			} else {
-				return defaultValue;
-			}
-		}
-	}
-
 	private CompletableFuture<JobGraph> getJobGraphAsync(
-			final Path jarFile,
-			@Nullable final String entryClass,
-			final List<String> programArgs,
-			final SavepointRestoreSettings savepointRestoreSettings,
-			final int parallelism) {
-
+			JarHandlerContext context,
+			final SavepointRestoreSettings savepointRestoreSettings) {
 		return CompletableFuture.supplyAsync(() -> {
-			if (!Files.exists(jarFile)) {
-				throw new CompletionException(new RestHandlerException(
-					String.format("Jar file %s does not exist", jarFile), HttpResponseStatus.BAD_REQUEST));
-			}
-
-			final JobGraph jobGraph;
-			try {
-				final PackagedProgram packagedProgram = new PackagedProgram(
-					jarFile.toFile(),
-					entryClass,
-					programArgs.toArray(new String[programArgs.size()]));
-				jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism);
-			} catch (final ProgramInvocationException e) {
-				throw new CompletionException(e);
-			}
+			final JobGraph jobGraph = context.toJobGraph(configuration);
 			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 			return jobGraph;
 		}, executor);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index fa062e906bc..6085528d0ce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -68,6 +68,7 @@ public static JarRunHeaders getInstance() {
 
 	@Override
 	public String getDescription() {
-		return "Submits a job by running a jar previously uploaded via '" + JarUploadHeaders.URL + "'.";
+		return "Submits a job by running a jar previously uploaded via '" + JarUploadHeaders.URL + "'. " +
+			"Program arguments can be passed both via the JSON request (recommended) or query parameters.";
 	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
index 78267db22a0..ed69b81017f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,32 +29,18 @@
 /**
  * {@link MessageParameters} for {@link JarRunHandler}.
  */
-public class JarRunMessageParameters extends MessageParameters {
+public class JarRunMessageParameters extends JarMessageParameters {
 
-	public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+	final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-	public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter();
-
-	public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
-
-	public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
-
-	public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
-
-	public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
-
-	@Override
-	public Collection<MessagePathParameter<?>> getPathParameters() {
-		return Collections.singletonList(jarIdPathParameter);
-	}
+	final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
 
 	@Override
 	public Collection<MessageQueryParameter<?>> getQueryParameters() {
-		return Collections.unmodifiableCollection(Arrays.asList(
-			programArgsQueryParameter,
-			entryClassQueryParameter,
-			parallelismQueryParameter,
+		Collection<MessageQueryParameter<?>> pars = new ArrayList<>(Arrays.asList(
 			allowNonRestoredStateQueryParameter,
 			savepointPathQueryParameter));
+		pars.addAll(super.getQueryParameters());
+		return Collections.unmodifiableCollection(pars);
 	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index b30ae6113ef..9e4ee0f8c2a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -27,30 +27,16 @@
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 /**
  * {@link RequestBody} for running a jar.
  */
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public class JarRunRequestBody implements RequestBody {
-
-	private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
-	private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
-	private static final String FIELD_NAME_PARALLELISM = "parallelism";
+public class JarRunRequestBody extends JarRequestBody {
 	private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
 	private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
 
-	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
-	@Nullable
-	private String entryClassName;
-
-	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
-	@Nullable
-	private String programArguments;
-
-	@JsonProperty(FIELD_NAME_PARALLELISM)
-	@Nullable
-	private Integer parallelism;
-
 	@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
 	@Nullable
 	private Boolean allowNonRestoredState;
@@ -60,41 +46,22 @@
 	private String savepointPath;
 
 	public JarRunRequestBody() {
-		this(null, null, null, null, null);
+		this(null, null, null, null, null, null);
 	}
 
 	@JsonCreator
 	public JarRunRequestBody(
 			@Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
 			@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
+			@Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> programArgumentsList,
 			@Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
 			@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
 			@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
-		this.entryClassName = entryClassName;
-		this.programArguments = programArguments;
-		this.parallelism = parallelism;
+		super(entryClassName, programArguments, programArgumentsList, parallelism);
 		this.allowNonRestoredState = allowNonRestoredState;
 		this.savepointPath = savepointPath;
 	}
 
-	@Nullable
-	@JsonIgnore
-	public String getEntryClassName() {
-		return entryClassName;
-	}
-
-	@Nullable
-	@JsonIgnore
-	public String getProgramArguments() {
-		return programArguments;
-	}
-
-	@Nullable
-	@JsonIgnore
-	public Integer getParallelism() {
-		return parallelism;
-	}
-
 	@Nullable
 	@JsonIgnore
 	public Boolean getAllowNonRestoredState() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java
new file mode 100644
index 00000000000..769da5b7a4f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying one or more arguments for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...)
+ */
+public class ProgramArgQueryParameter extends StringQueryParameter {
+	static final String PROGRAM_ARG_PARAMETER_NAME = "programArg";
+
+	public ProgramArgQueryParameter() {
+		super(PROGRAM_ARG_PARAMETER_NAME, MessageParameter.MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public String getDescription() {
+		return "Comma-separated list of program arguments.";
+	}
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
index d4b0a4fdd23..2cb77e59417 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
@@ -22,8 +22,10 @@
 
 /**
  * Query parameter specifying the arguments for the program.
+ * @deprecated please, use {@link JarRequestBody#FIELD_NAME_PROGRAM_ARGUMENTS_LIST}
  * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...)
  */
+@Deprecated
 public class ProgramArgsQueryParameter extends StringQueryParameter {
 
 	public ProgramArgsQueryParameter() {
@@ -32,6 +34,8 @@ public ProgramArgsQueryParameter() {
 
 	@Override
 	public String getDescription() {
-		return "String value that specifies the arguments for the program or plan.";
+		return String.format("Deprecated, please use '%s' instead. " +
+			"String value that specifies the arguments for the program or plan",
+			ProgramArgQueryParameter.PROGRAM_ARG_PARAMETER_NAME);
 	}
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index 3b1b7449303..9026cf00c5e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -18,14 +18,43 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.EntryClassQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter;
+import org.apache.flink.runtime.webmonitor.handlers.JarRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.ParallelismQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.ProgramArgQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.ProgramArgsQueryParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
@@ -34,6 +63,93 @@
  */
 public class JarHandlerUtils {
 
+	/** Standard jar handler parameters parsed from request. */
+	public static class JarHandlerContext {
+		private final Path jarFile;
+		private final String entryClass;
+		private final List<String> programArgs;
+		private final int parallelism;
+
+		private JarHandlerContext(Path jarFile, String entryClass, List<String> programArgs, int parallelism) {
+			this.jarFile = jarFile;
+			this.entryClass = entryClass;
+			this.programArgs = programArgs;
+			this.parallelism = parallelism;
+		}
+
+		public static <R extends JarRequestBody> JarHandlerContext fromRequest(
+				@Nonnull final HandlerRequest<R, ?> request,
+				@Nonnull final Path jarDir,
+				@Nonnull final Logger log) throws RestHandlerException {
+			final JarRequestBody requestBody = request.getRequestBody();
+
+			final String pathParameter = request.getPathParameter(JarIdPathParameter.class);
+			Path jarFile = jarDir.resolve(pathParameter);
+
+			String entryClass = fromRequestBodyOrQueryParameter(
+				emptyToNull(requestBody.getEntryClassName()),
+				() -> emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)),
+				null,
+				log);
+
+			List<String> programArgs = JarHandlerUtils.getProgramArgs(request, log);
+
+			int parallelism = fromRequestBodyOrQueryParameter(
+				requestBody.getParallelism(),
+				() -> getQueryParameter(request, ParallelismQueryParameter.class),
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				log);
+
+			return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism);
+		}
+
+		public JobGraph toJobGraph(Configuration configuration) {
+			if (!Files.exists(jarFile)) {
+				throw new CompletionException(new RestHandlerException(
+					String.format("Jar file %s does not exist", jarFile), HttpResponseStatus.BAD_REQUEST));
+			}
+
+			try {
+				final PackagedProgram packagedProgram = new PackagedProgram(
+					jarFile.toFile(),
+					entryClass,
+					programArgs.toArray(new String[0]));
+				return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism);
+			} catch (final ProgramInvocationException e) {
+				throw new CompletionException(e);
+			}
+		}
+	}
+
+	/** Parse program arguments in jar run or plan request. */
+	private static <R extends JarRequestBody, M extends MessageParameters> List<String> getProgramArgs(
+			HandlerRequest<R, M> request, Logger log) throws RestHandlerException {
+		JarRequestBody requestBody = request.getRequestBody();
+		@SuppressWarnings("deprecation")
+		List<String> programArgs = tokenizeArguments(
+			fromRequestBodyOrQueryParameter(
+				emptyToNull(requestBody.getProgramArguments()),
+				() -> getQueryParameter(request, ProgramArgsQueryParameter.class),
+				null,
+				log));
+		List<String> programArgsList =
+			fromRequestBodyOrQueryParameter(
+				requestBody.getProgramArgumentsList(),
+				() -> request.getQueryParameter(ProgramArgQueryParameter.class),
+				null,
+				log);
+		if (!programArgsList.isEmpty()) {
+			if (!programArgs.isEmpty()) {
+				throw new RestHandlerException(
+					"Confusing request: programArgs and programArgsList are specified, please, use only programArgsList",
+					HttpResponseStatus.BAD_REQUEST);
+			}
+			return programArgsList;
+		} else {
+			return programArgs;
+		}
+	}
+
 	private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
 
 	/**
@@ -48,7 +164,8 @@
 	 *
 	 * <strong>WARNING: </strong>This method does not respect escaped quotes.
 	 */
-	public static List<String> tokenizeArguments(@Nullable final String args) {
+	@VisibleForTesting
+	static List<String> tokenizeArguments(@Nullable final String args) {
 		if (args == null) {
 			return Collections.emptyList();
 		}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
new file mode 100644
index 00000000000..5c0ad507b4b
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+
+/** Base test class for jar request handlers. */
+public abstract class JarHandlerParameterTest
+	<REQB extends JarRequestBody, M extends JarMessageParameters> extends TestLogger {
+	enum ProgramArgsParType {
+		String,
+		List,
+		Both
+	}
+
+	static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
+	static final int PARALLELISM = 4;
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	@ClassRule
+	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
+
+	static final AtomicReference<JobGraph> LAST_SUBMITTED_JOB_GRAPH_REFERENCE = new AtomicReference<>();
+
+	static TestingDispatcherGateway restfulGateway;
+	static Path jarDir;
+	static GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+	static CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+	static Time timeout = Time.seconds(10);
+	static Map<String, String> responseHeaders = Collections.emptyMap();
+	static Executor executor = TestingUtils.defaultExecutor();
+
+	private static Path jarWithManifest;
+	private static Path jarWithoutManifest;
+
+	static void init() throws Exception {
+		jarDir = TMP.newFolder().toPath();
+
+		// properties are set property by surefire plugin
+		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
+		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
+		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+
+		jarWithManifest = Files.copy(
+			jarLocation.resolve(parameterProgramJarName),
+			jarDir.resolve("program-with-manifest.jar"));
+		jarWithoutManifest = Files.copy(
+			jarLocation.resolve(parameterProgramWithoutManifestJarName),
+			jarDir.resolve("program-without-manifest.jar"));
+
+		restfulGateway = new TestingDispatcherGateway.Builder()
+			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
+			.setSubmitFunction(jobGraph -> {
+				LAST_SUBMITTED_JOB_GRAPH_REFERENCE.set(jobGraph);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.build();
+
+		gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
+		localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
+		timeout = Time.seconds(10);
+		responseHeaders = Collections.emptyMap();
+		executor = TestingUtils.defaultExecutor();
+	}
+
+	@Before
+	public void reset() {
+		ParameterProgram.actualArguments = null;
+	}
+
+	@Test
+	public void testDefaultParameters() throws Exception {
+		// baseline, ensure that reasonable defaults are chosen
+		handleRequest(createRequest(
+			getDefaultJarRequestBody(),
+			getUnresolvedJarMessageParameters(),
+			getUnresolvedJarMessageParameters(),
+			jarWithManifest));
+		validateDefaultGraph();
+	}
+
+	@Test
+	public void testConfigurationViaQueryParametersWithProgArgsAsString() throws Exception {
+		testConfigurationViaQueryParameters(ProgramArgsParType.String);
+	}
+
+	@Test
+	public void testConfigurationViaQueryParametersWithProgArgsAsList() throws Exception {
+		testConfigurationViaQueryParameters(ProgramArgsParType.List);
+	}
+
+	@Test
+	public void testConfigurationViaQueryParametersFailWithProgArgsAsStringAndList() throws Exception {
+		try {
+			testConfigurationViaQueryParameters(ProgramArgsParType.Both);
+			fail("RestHandlerException is excepted");
+		} catch (RestHandlerException e) {
+			assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
+		}
+	}
+
+	private void testConfigurationViaQueryParameters(ProgramArgsParType programArgsParType) throws Exception {
+		// configure submission via query parameters
+		handleRequest(createRequest(
+			getDefaultJarRequestBody(),
+			getJarMessageParameters(programArgsParType),
+			getUnresolvedJarMessageParameters(),
+			jarWithoutManifest));
+		validateGraph();
+	}
+
+	@Test
+	public void testConfigurationViaJsonRequestWithProgArgsAsString() throws Exception {
+		testConfigurationViaJsonRequest(ProgramArgsParType.String);
+	}
+
+	@Test
+	public void testConfigurationViaJsonRequestWithProgArgsAsList() throws Exception {
+		testConfigurationViaJsonRequest(ProgramArgsParType.List);
+	}
+
+	@Test
+	public void testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() throws Exception {
+		try {
+			testConfigurationViaJsonRequest(ProgramArgsParType.Both);
+			fail("RestHandlerException is excepted");
+		} catch (RestHandlerException e) {
+			assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
+		}
+	}
+
+	private void testConfigurationViaJsonRequest(ProgramArgsParType programArgsParType) throws Exception {
+		handleRequest(createRequest(
+			getJarRequestBody(programArgsParType),
+			getUnresolvedJarMessageParameters(),
+			getUnresolvedJarMessageParameters(),
+			jarWithoutManifest
+		));
+		validateGraph();
+	}
+
+	@Test
+	public void testParameterPrioritizationWithProgArgsAsString() throws Exception {
+		testParameterPrioritization(ProgramArgsParType.String);
+	}
+
+	@Test
+	public void testParameterPrioritizationWithProgArgsAsList() throws Exception {
+		testParameterPrioritization(ProgramArgsParType.List);
+	}
+
+	@Test
+	public void testFailIfProgArgsAreAsStringAndAsList() throws Exception {
+		try {
+			testParameterPrioritization(ProgramArgsParType.Both);
+			fail("RestHandlerException is excepted");
+		} catch (RestHandlerException e) {
+			assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
+		}
+	}
+
+	private void testParameterPrioritization(ProgramArgsParType programArgsParType) throws Exception {
+		// configure submission via query parameters and JSON request, JSON should be prioritized
+		handleRequest(createRequest(
+			getJarRequestBody(programArgsParType),
+			getWrongJarMessageParameters(programArgsParType),
+			getUnresolvedJarMessageParameters(),
+			jarWithoutManifest));
+		validateGraph();
+	}
+
+	static String getProgramArgsString(ProgramArgsParType programArgsParType) {
+		return programArgsParType == ProgramArgsParType.String || programArgsParType == ProgramArgsParType.Both
+			? String.join(" ", PROG_ARGS) : null;
+	}
+
+	static List<String> getProgramArgsList(ProgramArgsParType programArgsParType) {
+		return programArgsParType == ProgramArgsParType.List || programArgsParType == ProgramArgsParType.Both
+			? Arrays.asList(PROG_ARGS) : null;
+	}
+
+	private static <REQB extends JarRequestBody, M extends JarMessageParameters>
+	HandlerRequest<REQB, M> createRequest(
+		REQB requestBody, M parameters, M unresolvedMessageParameters, Path jar)
+		throws HandlerRequestException {
+
+		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
+			.filter(MessageParameter::isResolved)
+			.collect(Collectors.toMap(
+				MessageParameter::getKey,
+				JarHandlerParameterTest::getValuesAsString
+			));
+
+		return new HandlerRequest<>(
+			requestBody,
+			unresolvedMessageParameters,
+			Collections.singletonMap(JarIdPathParameter.KEY, jar.getFileName().toString()),
+			queryParameterAsMap,
+			Collections.emptyList()
+		);
+	}
+
+	private static <X> List<String> getValuesAsString(MessageQueryParameter<X> parameter) {
+		final List<X> values = parameter.getValue();
+		return values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+	}
+
+	abstract M getUnresolvedJarMessageParameters();
+
+	abstract M getJarMessageParameters(ProgramArgsParType programArgsParType);
+
+	abstract M getWrongJarMessageParameters(ProgramArgsParType programArgsParType);
+
+	abstract REQB getDefaultJarRequestBody();
+
+	abstract REQB getJarRequestBody(ProgramArgsParType programArgsParType);
+
+	abstract void handleRequest(HandlerRequest<REQB, M> request) throws Exception;
+
+	JobGraph validateDefaultGraph() {
+		JobGraph jobGraph = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
+		Assert.assertEquals(0, ParameterProgram.actualArguments.length);
+		Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
+		return jobGraph;
+	}
+
+	JobGraph validateGraph() {
+		JobGraph jobGraph = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
+		Assert.assertArrayEquals(PROG_ARGS, ParameterProgram.actualArguments);
+		Assert.assertEquals(PARALLELISM, getExecutionConfig(jobGraph).getParallelism());
+		return jobGraph;
+	}
+
+	private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
+		ExecutionConfig executionConfig;
+		try {
+			executionConfig = jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
+		} catch (Exception e) {
+			throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
+		}
+		return executionConfig;
+	}
+}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java
new file mode 100644
index 00000000000..1e496bff897
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+
+import org.junit.BeforeClass;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the parameter handling of the {@link JarPlanHandler}.
+ */
+public class JarPlanHandlerParameterTest extends JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> {
+	private static JarPlanHandler handler;
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		init();
+		handler = new JarPlanHandler(
+			localAddressFuture,
+			gatewayRetriever,
+			timeout,
+			responseHeaders,
+			JarPlanHeaders.getInstance(),
+			jarDir,
+			new Configuration(),
+			executor,
+			jobGraph -> {
+				LAST_SUBMITTED_JOB_GRAPH_REFERENCE.set(jobGraph);
+				return new JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph));
+			});
+	}
+
+	@Override
+	JarPlanMessageParameters getUnresolvedJarMessageParameters() {
+		return handler.getMessageHeaders().getUnresolvedMessageParameters();
+	}
+
+	@Override
+	JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) {
+		final JarPlanMessageParameters parameters = getUnresolvedJarMessageParameters();
+		parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+		parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM));
+		if (programArgsParType == ProgramArgsParType.String ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgsQueryParameter.resolve(Collections.singletonList(String.join(" ", PROG_ARGS)));
+		}
+		if (programArgsParType == ProgramArgsParType.List ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS));
+		}
+		return parameters;
+	}
+
+	@Override
+	JarPlanMessageParameters getWrongJarMessageParameters(ProgramArgsParType programArgsParType) {
+		List<String> wrongArgs = Arrays.stream(PROG_ARGS).map(a -> a + "wrong").collect(Collectors.toList());
+		String argsWrongStr = String.join(" ", wrongArgs);
+		final JarPlanMessageParameters parameters = getUnresolvedJarMessageParameters();
+		parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+		parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+		if (programArgsParType == ProgramArgsParType.String || programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgsQueryParameter.resolve(Collections.singletonList(argsWrongStr));
+		}
+		if (programArgsParType == ProgramArgsParType.List ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgQueryParameter.resolve(wrongArgs);
+		}
+		return parameters;
+	}
+
+	@Override
+	JarPlanRequestBody getDefaultJarRequestBody() {
+		return new JarPlanRequestBody();
+	}
+
+	@Override
+	JarPlanRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) {
+		return new JarPlanRequestBody(
+			ParameterProgram.class.getCanonicalName(),
+			getProgramArgsString(programArgsParType),
+			getProgramArgsList(programArgsParType),
+			PARALLELISM);
+	}
+
+	@Override
+	void handleRequest(HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> request)
+		throws Exception {
+		handler.handleRequest(request, restfulGateway).get();
+	}
+}
+
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 8bb358a543c..c36f7c7802e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -18,89 +18,39 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.messages.MessageParameter;
-import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
  */
-public class JarRunHandlerParameterTest extends TestLogger {
+public class JarRunHandlerParameterTest extends JarHandlerParameterTest<JarRunRequestBody, JarRunMessageParameters> {
+	private static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
+	private static final String RESTORE_PATH = "/foo/bar";
 
-	@ClassRule
-	public static final TemporaryFolder TMP = new TemporaryFolder();
-
-	@ClassRule
-	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
-
-	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
 	private static JarRunHandler handler;
-	private static Path jarWithManifest;
-	private static Path jarWithoutManifest;
-	private static TestingDispatcherGateway restfulGateway;
 
 	@BeforeClass
 	public static void setup() throws Exception {
-		Path jarDir = TMP.newFolder().toPath();
-
-		// properties are set property by surefire plugin
-		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
-		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
-		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
-
-		jarWithManifest = Files.copy(
-			jarLocation.resolve(parameterProgramJarName),
-			jarDir.resolve("program-with-manifest.jar"));
-		jarWithoutManifest = Files.copy(
-			jarLocation.resolve(parameterProgramWithoutManifestJarName),
-			jarDir.resolve("program-without-manifest.jar"));
-
-		Configuration config = new Configuration();
-		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
-			TMP.newFolder().getAbsolutePath());
-
-		restfulGateway = new TestingDispatcherGateway.Builder()
-			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
-			.setSubmitFunction(jobGraph -> {
-				lastSubmittedJobGraphReference.set(jobGraph);
-				return CompletableFuture.completedFuture(Acknowledge.get());
-			})
-			.build();
+		init();
 		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
 		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
 		final Time timeout = Time.seconds(10);
@@ -118,197 +68,87 @@ public static void setup() throws Exception {
 			executor);
 	}
 
-	@Before
-	public void reset() {
-		ParameterProgram.actualArguments = null;
+	@Override
+	JarRunMessageParameters getUnresolvedJarMessageParameters() {
+		return handler.getMessageHeaders().getUnresolvedMessageParameters();
 	}
 
-	@Test
-	public void testDefaultParameters() throws Exception {
-		// baseline, ensure that reasonable defaults are chosen
-		sendRequestAndValidateGraph(
-			handler,
-			restfulGateway,
-			() -> createRequest(
-				new JarRunRequestBody(),
-				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-				jarWithManifest
-			),
-			jobGraph -> {
-				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
-
-				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
-
-				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
-				Assert.assertNull(savepointRestoreSettings.getRestorePath());
-			}
-		);
-	}
-
-	@Test
-	public void testConfigurationViaQueryParameters() throws Exception {
-		// configure submission via query parameters
-		sendRequestAndValidateGraph(
-			handler,
-			restfulGateway,
-			() -> {
-				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
-				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
-				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
-				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
-				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
-
-				return createRequest(
-					new JarRunRequestBody(),
-					parameters,
-					jarWithoutManifest
-				);
-			},
-			jobGraph -> {
-				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
-				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
-				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
-				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
-				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
-
-				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
-
-				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
-			}
-		);
+	@Override
+	JarRunMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) {
+		final JarRunMessageParameters parameters = getUnresolvedJarMessageParameters();
+		parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(ALLOW_NON_RESTORED_STATE_QUERY));
+		parameters.savepointPathQueryParameter.resolve(Collections.singletonList(RESTORE_PATH));
+		parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+		parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM));
+		if (programArgsParType == ProgramArgsParType.String ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgsQueryParameter.resolve(Collections.singletonList(String.join(" ", PROG_ARGS)));
+		}
+		if (programArgsParType == ProgramArgsParType.List ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS));
+		}
+		return parameters;
 	}
 
-	@Test
-	public void testConfigurationViaJsonRequest() throws Exception {
-		sendRequestAndValidateGraph(
-			handler,
-			restfulGateway,
-			() -> {
-				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
-					ParameterProgram.class.getCanonicalName(),
-					"--host localhost --port 1234",
-					4,
-					true,
-					"/foo/bar"
-				);
-
-				return createRequest(
-					jsonRequest,
-					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-					jarWithoutManifest
-				);
-			},
-			jobGraph -> {
-				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
-				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
-				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
-				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
-				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
-
-				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
-
-				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
-			}
-		);
+	@Override
+	JarRunMessageParameters getWrongJarMessageParameters(ProgramArgsParType programArgsParType) {
+		List<String> wrongArgs = Arrays.stream(PROG_ARGS).map(a -> a + "wrong").collect(Collectors.toList());
+		String argsWrongStr = String.join(" ", wrongArgs);
+		final JarRunMessageParameters parameters = getUnresolvedJarMessageParameters();
+		parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
+		parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
+		parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+		parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+		if (programArgsParType == ProgramArgsParType.String ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgsQueryParameter.resolve(Collections.singletonList(argsWrongStr));
+		}
+		if (programArgsParType == ProgramArgsParType.List ||
+			programArgsParType == ProgramArgsParType.Both) {
+			parameters.programArgQueryParameter.resolve(wrongArgs);
+		}
+		return parameters;
 	}
 
-	@Test
-	public void testParameterPrioritization() throws Exception {
-		// configure submission via query parameters and JSON request, JSON should be prioritized
-		sendRequestAndValidateGraph(
-			handler,
-			restfulGateway,
-			() -> {
-				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
-					ParameterProgram.class.getCanonicalName(),
-					"--host localhost --port 1234",
-					4,
-					true,
-					"/foo/bar"
-				);
-
-				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
-				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
-				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
-				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
-				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
-
-				return createRequest(
-					jsonRequest,
-					parameters,
-					jarWithoutManifest
-				);
-			},
-			jobGraph -> {
-				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
-				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
-				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
-				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
-				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
-
-				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
-
-				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
-			}
-		);
+	@Override
+	JarRunRequestBody getDefaultJarRequestBody() {
+		return new JarRunRequestBody();
 	}
 
-	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
-			JarRunRequestBody requestBody,
-			JarRunMessageParameters parameters,
-			Path jar) throws HandlerRequestException {
-
-		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
-			.filter(MessageParameter::isResolved)
-			.collect(Collectors.toMap(
-				MessageParameter::getKey,
-				JarRunHandlerParameterTest::getValuesAsString
-			));
-
-		return new HandlerRequest<>(
-			requestBody,
-			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-			Collections.singletonMap(JarIdPathParameter.KEY, jar.getFileName().toString()),
-			queryParameterAsMap,
-			Collections.emptyList()
+	@Override
+	JarRunRequestBody getJarRequestBody(ProgramArgsParType programArgsParType) {
+		return new JarRunRequestBody(
+			ParameterProgram.class.getCanonicalName(),
+			getProgramArgsString(programArgsParType),
+			getProgramArgsList(programArgsParType),
+			PARALLELISM,
+			ALLOW_NON_RESTORED_STATE_QUERY,
+			RESTORE_PATH
 		);
 	}
 
-	private static void sendRequestAndValidateGraph(
-			JarRunHandler handler,
-			DispatcherGateway dispatcherGateway,
-			SupplierWithException<HandlerRequest<JarRunRequestBody, JarRunMessageParameters>, HandlerRequestException> requestSupplier,
-			ThrowingConsumer<JobGraph, AssertionError> validator) throws Exception {
-
-		handler.handleRequest(requestSupplier.get(), dispatcherGateway)
-			.get();
-
-		JobGraph submittedJobGraph = lastSubmittedJobGraphReference.getAndSet(null);
-
-		validator.accept(submittedJobGraph);
+	@Override
+	void handleRequest(HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
+		throws Exception {
+		handler.handleRequest(request, restfulGateway).get();
 	}
 
-	private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
-		ExecutionConfig executionConfig;
-		try {
-			executionConfig = jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
-		} catch (Exception e) {
-			throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
-		}
-		return executionConfig;
+	@Override
+	JobGraph validateDefaultGraph() {
+		JobGraph jobGraph = super.validateDefaultGraph();
+		final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+		Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
+		Assert.assertNull(savepointRestoreSettings.getRestorePath());
+		return jobGraph;
 	}
 
-	private static <X> List<String> getValuesAsString(MessageQueryParameter<X> parameter) {
-		final List<X> values = parameter.getValue();
-		return values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+	@Override
+	JobGraph validateGraph() {
+		JobGraph jobGraph = super.validateGraph();
+		final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
+		Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+		Assert.assertEquals(RESTORE_PATH, savepointRestoreSettings.getRestorePath());
+		return jobGraph;
 	}
 }
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index 0706873c0fa..18fcd97d340 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -20,6 +20,8 @@
 
 import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
 
+import java.util.Arrays;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -33,10 +35,11 @@
 	}
 
 	@Override
-	protected JarRunRequestBody getTestRequestInstance() throws Exception {
+	protected JarRunRequestBody getTestRequestInstance() {
 		return new JarRunRequestBody(
 			"hello",
 			"world",
+			Arrays.asList("boo", "far"),
 			4,
 			true,
 			"foo/bar"
@@ -49,6 +52,7 @@ protected void assertOriginalEqualsToUnmarshalled(
 			final JarRunRequestBody actual) {
 		assertEquals(expected.getEntryClassName(), actual.getEntryClassName());
 		assertEquals(expected.getProgramArguments(), actual.getProgramArguments());
+		assertEquals(expected.getProgramArgumentsList(), actual.getProgramArgumentsList());
 		assertEquals(expected.getParallelism(), actual.getParallelism());
 		assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState());
 		assertEquals(expected.getSavepointPath(), actual.getSavepointPath());
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
index e64c708dbe7..0753c494c63 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -127,8 +127,8 @@ private static JarListInfo listJars(JarListHandler handler, RestfulGateway restf
 
 	private static JobPlanInfo showPlan(JarPlanHandler handler, String jarName, RestfulGateway restfulGateway) throws Exception {
 		JarPlanMessageParameters planParameters = JarPlanHeaders.getInstance().getUnresolvedMessageParameters();
-		HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>(
-			EmptyRequestBody.getInstance(),
+		HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> planRequest = new HandlerRequest<>(
+			new JarPlanRequestBody(),
 			planParameters,
 			Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName),
 			Collections.emptyMap(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
index c738df821bb..8b18f9907f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
@@ -23,9 +23,12 @@
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.slf4j.Logger;
+
 import java.util.List;
 
 /**
@@ -63,4 +66,26 @@
 		return value;
 	}
 
+	/**
+	 * Returns {@code requestValue} if it is not null, otherwise returns the query parameter value
+	 * if it is not null, otherwise returns the default value.
+	 */
+	public static <T> T fromRequestBodyOrQueryParameter(
+			T requestValue,
+			SupplierWithException<T, RestHandlerException> queryParameterExtractor,
+			T defaultValue,
+			Logger log) throws RestHandlerException {
+		if (requestValue != null) {
+			return requestValue;
+		} else {
+			T queryParameterValue = queryParameterExtractor.get();
+			if (queryParameterValue != null) {
+				log.warn("Configuring the job submission via query parameters is deprecated." +
+					" Please migrate to submitting a JSON request instead.");
+				return queryParameterValue;
+			} else {
+				return defaultValue;
+			}
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services