You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/07/13 12:08:30 UTC

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6330

    [FLINK-9499][rest] Support JSON request in JarHandlers

    ## What is the purpose of the change
    
    With this PR it is now possible to configure the submission via the `JarRunHandler` with a JSON payload instead of query parameters.
    This has a number of advantages:
    * savepoint paths no longer have to be escaped to ridiculous degrees
    * program arguments are no longer limited in length (issue raised in FLINK9499)
    * arguments are transmitted as is and aren't subject to query parameter parsing rules (FLINK-9832)
    
    The handler now accepts the configuration both via JSON and query parameters, with JSON being prioritized. The WebUI makes use of this to be compatible with both new and legacy handlers, which is pretty rad.
    
    Note that the `JarPlanHandler` still suffers from the problems listed above. While this handler technically works similar to the `JarRunHandler` (in fact we _could_ adjust it to work the same way) it unfortunately is a `GET` request, for which we do not allow payloads.
    
    This PR shares some code with #6311.
    * simplification of dispatcher host retrieval
    * introduction of `BlobServerResource`
    
    ## Brief change log
    
    * added `JarRunRequestBody`, with optional fields for all parameters
    * modified `JarRunHandler` to accept parameters both via JSON and query parameters
    * modified WebUI to submit configuration both via JSON and query parameters
      * new handlers will ignore query parameters
      * legacy handlers will ignore JSON
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    * build `flink-runtime-web` (this step is important since we build jar for the test)
    * run `JarRunHandlerParameterTest`
    
    The test verifies the successful submission with parameters
    * not being specified at all
    * being specified via query parameters
    * being specified via JSON request
    * being specified via both query parameters and JSON request.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9499

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6330
    
----
commit 662eb3baa013a1a0eddf9ca0e4ae08caa9cccc48
Author: zentol <ch...@...>
Date:   2018-07-13T12:00:10Z

    [FLINK-9499][rest] Support JSON request in JarHandlers

----


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203302518
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java ---
    @@ -0,0 +1,109 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * {@link RequestBody} for running a jar.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JarRunRequestBody implements RequestBody {
    +
    +	private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
    +	private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
    +	private static final String FIELD_NAME_PARALLELISM = "parallelism";
    +	private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
    +	private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
    +
    +	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
    +	@Nullable
    +	private String entryClassName;
    +
    +	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
    +	@Nullable
    +	private String programArguments;
    +
    +	@JsonProperty(FIELD_NAME_PARALLELISM)
    +	@Nullable
    +	private Integer parallelism;
    +
    +	@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
    +	@Nullable
    +	private Boolean allowNonRestoredState;
    +
    +	@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
    +	@Nullable
    +	private String savepointPath;
    +
    +	public JarRunRequestBody() {
    +		this(null, null, null, null, null);
    +	}
    +
    +	@JsonCreator
    +	public JarRunRequestBody(
    +			@JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
    +			@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
    +			@JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
    +			@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
    +			@JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
    --- End diff --
    
    yes they should be nullable


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203359445
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    +
    +	@ClassRule
    +	public static final TemporaryFolder TMP = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    +
    +	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
    +	private static JarRunHandler handler;
    +	private static Path jarWithManifest;
    +	private static Path jarWithoutManifest;
    +	private static TestingDispatcherGateway restfulGateway;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Path jarDir = TMP.newFolder().toPath();
    +
    +		// properties are set property by surefire plugin
    +		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
    +		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
    +		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
    +
    +		jarWithManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramJarName),
    +			jarDir.resolve("program-with-manifest.jar"));
    +		jarWithoutManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramWithoutManifestJarName),
    +			jarDir.resolve("program-without-manifest.jar"));
    +
    +		Configuration config = new Configuration();
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			TMP.newFolder().getAbsolutePath());
    +
    +		restfulGateway = new TestingDispatcherGateway.Builder()
    +			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
    +			.setSubmitFunction(jobGraph -> {
    +				lastSubmittedJobGraphReference.set(jobGraph);
    +				return CompletableFuture.completedFuture(Acknowledge.get());
    +			})
    +			.build();
    +		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
    +		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    +		final Time timeout = Time.seconds(10);
    +		final Map<String, String> responseHeaders = Collections.emptyMap();
    +		final Executor executor = TestingUtils.defaultExecutor();
    +
    +		handler = new JarRunHandler(
    +			localAddressFuture,
    +			gatewayRetriever,
    +			timeout,
    +			responseHeaders,
    +			JarRunHeaders.getInstance(),
    +			jarDir,
    +			new Configuration(),
    +			executor);
    +	}
    +
    +	@Before
    +	public void reset() {
    +		ParameterProgram.actualArguments = null;
    +	}
    +
    +	@Test
    +	public void testDefaultParameters() throws Exception {
    +		// baseline, ensure that reasonable defaults are chosen
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> createRequest(
    +				new JarRunRequestBody(),
    +				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +				jarWithManifest
    +			),
    +			jobGraph -> {
    +				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
    +
    +				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertNull(savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaQueryParameters() throws Exception {
    +		// configure submission via query parameters
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
    +
    +				return createRequest(
    +					new JarRunRequestBody(),
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaJsonRequest() throws Exception {
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				return createRequest(
    +					jsonRequest,
    +					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testParameterPrioritization() throws Exception {
    +		// configure submission via query parameters and JSON request, JSON should be prioritized
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
    +
    +				return createRequest(
    +					jsonRequest,
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
    +			JarRunRequestBody requestBody,
    +			JarRunMessageParameters parameters,
    +			Path jar) throws HandlerRequestException {
    +
    +		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
    +			.filter(MessageParameter::isResolved)
    +			.collect(Collectors.toMap(
    +				MessageParameter::getKey,
    +				JarRunHandlerParameterTest::getValuesAsString
    +			));
    +
    +		return new HandlerRequest<>(
    +			requestBody,
    +			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    --- End diff --
    
    Ah I see. You're right. I think this could be a nice improvement to initialize the `MessageParameter` instance before giving it to the `HandlerRequest`. But this is out of scope for this PR.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203303832
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -138,12 +154,22 @@ public JarRunHandler(
     			});
     	}
     
    -	private static SavepointRestoreSettings getSavepointRestoreSettings(
    -			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
    +	private SavepointRestoreSettings getSavepointRestoreSettings(
    +			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
     				throws RestHandlerException {
     
    -		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
    -		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
    +		final JarRunRequestBody requestBody = request.getRequestBody();
    +
    +		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
    +			requestBody.getAllowNonRestoredState(),
    +			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
    +			false,
    +			log);
    +		final String savepointPath = fromRequestBodyOrQueryParameter(
    --- End diff --
    
    This could result in unexpected NullPointerExceptions when retrieving a primitive, like in the following example:
    ```
    fromRequestBodyOrQueryParameter(
    		requestBody.getParallelism(),
    		() -> getQueryParameter(request, ParallelismQueryParameter.class)
    		log);
    ```
    The explicit default argument prevents that from happening.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203288959
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    --- End diff --
    
    Should extend `TestLogger`.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203302981
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    +
    +	@ClassRule
    +	public static final TemporaryFolder TMP = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    +
    +	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
    +	private static JarRunHandler handler;
    +	private static Path jarWithManifest;
    +	private static Path jarWithoutManifest;
    +	private static TestingDispatcherGateway restfulGateway;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Path jarDir = TMP.newFolder().toPath();
    +
    +		// properties are set property by surefire plugin
    +		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
    +		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
    +		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
    +
    +		jarWithManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramJarName),
    +			jarDir.resolve("program-with-manifest.jar"));
    +		jarWithoutManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramWithoutManifestJarName),
    +			jarDir.resolve("program-without-manifest.jar"));
    +
    +		Configuration config = new Configuration();
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			TMP.newFolder().getAbsolutePath());
    +
    +		restfulGateway = new TestingDispatcherGateway.Builder()
    +			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
    +			.setSubmitFunction(jobGraph -> {
    +				lastSubmittedJobGraphReference.set(jobGraph);
    +				return CompletableFuture.completedFuture(Acknowledge.get());
    +			})
    +			.build();
    +		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
    +		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    +		final Time timeout = Time.seconds(10);
    +		final Map<String, String> responseHeaders = Collections.emptyMap();
    +		final Executor executor = TestingUtils.defaultExecutor();
    +
    +		handler = new JarRunHandler(
    +			localAddressFuture,
    +			gatewayRetriever,
    +			timeout,
    +			responseHeaders,
    +			JarRunHeaders.getInstance(),
    +			jarDir,
    +			new Configuration(),
    +			executor);
    +	}
    +
    +	@Before
    +	public void reset() {
    +		ParameterProgram.actualArguments = null;
    +	}
    +
    +	@Test
    +	public void testDefaultParameters() throws Exception {
    +		// baseline, ensure that reasonable defaults are chosen
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> createRequest(
    +				new JarRunRequestBody(),
    +				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +				jarWithManifest
    +			),
    +			jobGraph -> {
    +				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
    +
    +				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertNull(savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaQueryParameters() throws Exception {
    +		// configure submission via query parameters
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
    +
    +				return createRequest(
    +					new JarRunRequestBody(),
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaJsonRequest() throws Exception {
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				return createRequest(
    +					jsonRequest,
    +					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testParameterPrioritization() throws Exception {
    +		// configure submission via query parameters and JSON request, JSON should be prioritized
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
    +
    +				return createRequest(
    +					jsonRequest,
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
    +			JarRunRequestBody requestBody,
    +			JarRunMessageParameters parameters,
    +			Path jar) throws HandlerRequestException {
    +
    +		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
    +			.filter(MessageParameter::isResolved)
    +			.collect(Collectors.toMap(
    +				MessageParameter::getKey,
    +				JarRunHandlerParameterTest::getValuesAsString
    +			));
    +
    +		return new HandlerRequest<>(
    +			requestBody,
    +			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    --- End diff --
    
    `HandlerRequest` doesn't work like that.
    It takes the parameter map from netty and inserts them into the `MessageParameters`.
    We could move logic into a static factory method and have the request be a simple container, but changing that would be out-of-scope of this PR.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203365784
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -138,12 +154,22 @@ public JarRunHandler(
     			});
     	}
     
    -	private static SavepointRestoreSettings getSavepointRestoreSettings(
    -			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
    +	private SavepointRestoreSettings getSavepointRestoreSettings(
    +			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
     				throws RestHandlerException {
     
    -		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
    -		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
    +		final JarRunRequestBody requestBody = request.getRequestBody();
    +
    +		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
    +			requestBody.getAllowNonRestoredState(),
    +			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
    +			false,
    +			log);
    +		final String savepointPath = fromRequestBodyOrQueryParameter(
    --- End diff --
    
    True, making the specification of the default value explicit will better guard against this. In the other case, the user would have to know the difference between these methods and which to apply to primitive and non-primitive values. Alright, then it's good to go from my side.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203358033
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -138,12 +154,22 @@ public JarRunHandler(
     			});
     	}
     
    -	private static SavepointRestoreSettings getSavepointRestoreSettings(
    -			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
    +	private SavepointRestoreSettings getSavepointRestoreSettings(
    +			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
     				throws RestHandlerException {
     
    -		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
    -		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
    +		final JarRunRequestBody requestBody = request.getRequestBody();
    +
    +		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
    +			requestBody.getAllowNonRestoredState(),
    +			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
    +			false,
    +			log);
    +		final String savepointPath = fromRequestBodyOrQueryParameter(
    --- End diff --
    
    Wouldn't something like this work:
    ```
    private static <T> T fromRequestBodyOrQueryParameter(
    			T requestValue,
    			SupplierWithException<T, RestHandlerException> queryParameterExtractor,
    			Logger log) throws RestHandlerException {
                     return fromRequestBodyOrQueryParameter(requestValue, queryParameterExtractor, null, log);
    	}
    ```


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203309580
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java ---
    @@ -0,0 +1,109 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * {@link RequestBody} for running a jar.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JarRunRequestBody implements RequestBody {
    +
    +	private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
    +	private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
    +	private static final String FIELD_NAME_PARALLELISM = "parallelism";
    +	private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
    +	private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
    +
    +	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
    +	@Nullable
    +	private String entryClassName;
    +
    +	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
    +	@Nullable
    +	private String programArguments;
    +
    +	@JsonProperty(FIELD_NAME_PARALLELISM)
    +	@Nullable
    +	private Integer parallelism;
    +
    +	@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
    +	@Nullable
    +	private Boolean allowNonRestoredState;
    +
    +	@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
    +	@Nullable
    +	private String savepointPath;
    +
    +	public JarRunRequestBody() {
    +		this(null, null, null, null, null);
    +	}
    +
    +	@JsonCreator
    +	public JarRunRequestBody(
    +			@JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
    +			@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
    +			@JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
    +			@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
    +			@JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
    --- End diff --
    
    For example, if only a partial body is sent some fields may be null. I couldn't quickly find a way to allow either all fields or non to be null.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6330


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203287485
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -138,12 +154,22 @@ public JarRunHandler(
     			});
     	}
     
    -	private static SavepointRestoreSettings getSavepointRestoreSettings(
    -			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
    +	private SavepointRestoreSettings getSavepointRestoreSettings(
    +			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
     				throws RestHandlerException {
     
    -		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
    -		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
    +		final JarRunRequestBody requestBody = request.getRequestBody();
    +
    +		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
    +			requestBody.getAllowNonRestoredState(),
    +			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
    +			false,
    +			log);
    +		final String savepointPath = fromRequestBodyOrQueryParameter(
    --- End diff --
    
    Maybe we could add a `fromRequestBodyOrQueryParameter` method which does not take a default value as a convenience function. That way we would not have all the calls where we pass `null`.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203291791
  
    --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.BlobServerOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.dispatcher.DispatcherGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.HandlerRequestException;
    +import org.apache.flink.runtime.rest.messages.MessageParameter;
    +import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.runtime.util.BlobServerResource;
    +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Tests for the parameter handling of the {@link JarRunHandler}.
    + */
    +public class JarRunHandlerParameterTest {
    +
    +	@ClassRule
    +	public static final TemporaryFolder TMP = new TemporaryFolder();
    +
    +	@ClassRule
    +	public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    +
    +	private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
    +	private static JarRunHandler handler;
    +	private static Path jarWithManifest;
    +	private static Path jarWithoutManifest;
    +	private static TestingDispatcherGateway restfulGateway;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Path jarDir = TMP.newFolder().toPath();
    +
    +		// properties are set property by surefire plugin
    +		final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar";
    +		final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar";
    +		final Path jarLocation = Paths.get(System.getProperty("targetDir"));
    +
    +		jarWithManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramJarName),
    +			jarDir.resolve("program-with-manifest.jar"));
    +		jarWithoutManifest = Files.copy(
    +			jarLocation.resolve(parameterProgramWithoutManifestJarName),
    +			jarDir.resolve("program-without-manifest.jar"));
    +
    +		Configuration config = new Configuration();
    +		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
    +			TMP.newFolder().getAbsolutePath());
    +
    +		restfulGateway = new TestingDispatcherGateway.Builder()
    +			.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
    +			.setSubmitFunction(jobGraph -> {
    +				lastSubmittedJobGraphReference.set(jobGraph);
    +				return CompletableFuture.completedFuture(Acknowledge.get());
    +			})
    +			.build();
    +		final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
    +		final CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    +		final Time timeout = Time.seconds(10);
    +		final Map<String, String> responseHeaders = Collections.emptyMap();
    +		final Executor executor = TestingUtils.defaultExecutor();
    +
    +		handler = new JarRunHandler(
    +			localAddressFuture,
    +			gatewayRetriever,
    +			timeout,
    +			responseHeaders,
    +			JarRunHeaders.getInstance(),
    +			jarDir,
    +			new Configuration(),
    +			executor);
    +	}
    +
    +	@Before
    +	public void reset() {
    +		ParameterProgram.actualArguments = null;
    +	}
    +
    +	@Test
    +	public void testDefaultParameters() throws Exception {
    +		// baseline, ensure that reasonable defaults are chosen
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> createRequest(
    +				new JarRunRequestBody(),
    +				JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +				jarWithManifest
    +			),
    +			jobGraph -> {
    +				Assert.assertEquals(0, ParameterProgram.actualArguments.length);
    +
    +				Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertNull(savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaQueryParameters() throws Exception {
    +		// configure submission via query parameters
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
    +
    +				return createRequest(
    +					new JarRunRequestBody(),
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testConfigurationViaJsonRequest() throws Exception {
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				return createRequest(
    +					jsonRequest,
    +					JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	@Test
    +	public void testParameterPrioritization() throws Exception {
    +		// configure submission via query parameters and JSON request, JSON should be prioritized
    +		sendRequestAndValidateGraph(
    +			handler,
    +			restfulGateway,
    +			() -> {
    +				final JarRunRequestBody jsonRequest = new JarRunRequestBody(
    +					ParameterProgram.class.getCanonicalName(),
    +					"--host localhost --port 1234",
    +					4,
    +					true,
    +					"/foo/bar"
    +				);
    +
    +				final JarRunMessageParameters parameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
    +				parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
    +				parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
    +				parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
    +				parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
    +				parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
    +
    +				return createRequest(
    +					jsonRequest,
    +					parameters,
    +					jarWithoutManifest
    +				);
    +			},
    +			jobGraph -> {
    +				Assert.assertEquals(4, ParameterProgram.actualArguments.length);
    +				Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
    +				Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
    +				Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
    +				Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
    +
    +				Assert.assertEquals(4, getExecutionConfig(jobGraph).getParallelism());
    +
    +				final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
    +				Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
    +				Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
    +			}
    +		);
    +	}
    +
    +	private static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(
    +			JarRunRequestBody requestBody,
    +			JarRunMessageParameters parameters,
    +			Path jar) throws HandlerRequestException {
    +
    +		final Map<String, List<String>> queryParameterAsMap = parameters.getQueryParameters().stream()
    +			.filter(MessageParameter::isResolved)
    +			.collect(Collectors.toMap(
    +				MessageParameter::getKey,
    +				JarRunHandlerParameterTest::getValuesAsString
    +			));
    +
    +		return new HandlerRequest<>(
    +			requestBody,
    +			JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
    --- End diff --
    
    Can't we simply give the `parameters` parameter here instead of creating a new `JarRunMessageParameters` instance? Then we could also avoid having to create the `queryParametersAsMap` map.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203361773
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---
    @@ -138,12 +154,22 @@ public JarRunHandler(
     			});
     	}
     
    -	private static SavepointRestoreSettings getSavepointRestoreSettings(
    -			final @Nonnull HandlerRequest<EmptyRequestBody, JarRunMessageParameters> request)
    +	private SavepointRestoreSettings getSavepointRestoreSettings(
    +			final @Nonnull HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request)
     				throws RestHandlerException {
     
    -		final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
    -		final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class);
    +		final JarRunRequestBody requestBody = request.getRequestBody();
    +
    +		final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter(
    +			requestBody.getAllowNonRestoredState(),
    +			() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
    +			false,
    +			log);
    +		final String savepointPath = fromRequestBodyOrQueryParameter(
    --- End diff --
    
    How does this prevent the scenario i described?


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r202330424
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java ---
    @@ -0,0 +1,109 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * {@link RequestBody} for running a jar.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JarRunRequestBody implements RequestBody {
    --- End diff --
    
    missing marshalling test


---

[GitHub] flink issue #6330: [FLINK-9499][rest] Support JSON request in JarHandlers

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6330
  
    Currently preparing the documentation changes.


---

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6330#discussion_r203288895
  
    --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java ---
    @@ -0,0 +1,109 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * {@link RequestBody} for running a jar.
    + */
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class JarRunRequestBody implements RequestBody {
    +
    +	private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
    +	private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
    +	private static final String FIELD_NAME_PARALLELISM = "parallelism";
    +	private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
    +	private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
    +
    +	@JsonProperty(FIELD_NAME_ENTRY_CLASS)
    +	@Nullable
    +	private String entryClassName;
    +
    +	@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
    +	@Nullable
    +	private String programArguments;
    +
    +	@JsonProperty(FIELD_NAME_PARALLELISM)
    +	@Nullable
    +	private Integer parallelism;
    +
    +	@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
    +	@Nullable
    +	private Boolean allowNonRestoredState;
    +
    +	@JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
    +	@Nullable
    +	private String savepointPath;
    +
    +	public JarRunRequestBody() {
    +		this(null, null, null, null, null);
    +	}
    +
    +	@JsonCreator
    +	public JarRunRequestBody(
    +			@JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName,
    +			@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments,
    +			@JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism,
    +			@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState,
    +			@JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) {
    --- End diff --
    
    If the fields are `@Nullable` shouldn't the constructor parameters be also `@Nullable`? Or are the fields only `@Nullable` because of the default constructor?


---