You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/11 03:29:32 UTC

[GitHub] [flink] czy006 opened a new pull request, #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

czy006 opened a new pull request, #21012:
URL: https://github.com/apache/flink/pull/21012

   # What is the purpose of the change
   
   Add new feature from FLIP-256 about support jm rest API flinkconfiguration
   
   # Brief change log
   
   - Add JM Rest Api Jar Run Rest Handler Support Flink Configuration
   - This Feature Tests
   
   # Verifying this change
   
   This change added tests and can be verified as follows:
   
   Added integration tests method for in **JarHandlerParameterTest** ,  testProvideFlinkConfig can test Flink job parameters order.In this test method, the external parameter does not set **parallelism.default** , but sets **parallelism.default** and **task.cancellation.timeout** in internal parameter(flinkconfig field),Finally, through the test, we observed that **parallelism.default** did not take effect in the flinkconfig field, because the external priority is the highest, and **parallelism.default** is not currently configured externally.
   
   # Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): (no)
   The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): (no)
   Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
   The S3 file system connector: (no)
   
   # Documentation
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (documented update will be next pr [FLINK-29544](https://issues.apache.org/jira/browse/FLINK-29544)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1016660433


##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,12 +92,12 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        final Configuration effectiveConfiguration = new Configuration(this.configuration);
         effectiveConfiguration.set(DeploymentOptions.ATTACHED, false);
         effectiveConfiguration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
 
         final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
-        context.applyToConfiguration(effectiveConfiguration);
+        context.applyToConfiguration(effectiveConfiguration, request);
         SavepointRestoreSettings.toConfiguration(
                 getSavepointRestoreSettings(request), effectiveConfiguration);

Review Comment:
   fix it. now not use the restFlinkConfig to set it first,it use effectiveConfiguration to set it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1006554349


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),

Review Comment:
   The method name and getJarRequestBodyWithJobId to maintain consistent style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1274042879

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "35800afb006d8268985089c7829eba83192a0ace",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "35800afb006d8268985089c7829eba83192a0ace",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 35800afb006d8268985089c7829eba83192a0ace UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1275495467

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1012593836


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   I found that this part cannot directly test only the parallelism part, because when we run JarRunHandler in the code, the above value will still appear in flinkconfig, so I overwrite the value of this part during unit testing, which is why I re- add the `JarHandlerParameterTest.testJarRequestBodyConfigEqToHandlerParameters` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1015217097


##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -130,24 +130,29 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
 
         final JarRunRequestBody requestBody = request.getRequestBody();
 
+        Configuration configuration = requestBody.getFlinkConfiguration();

Review Comment:
   why isn't this using the effective configuration?



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -68,6 +72,17 @@ enum ProgramArgsParType {
 
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
+    static final String RESTORE_PATH = "/foo/bar";
+    static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
+    static final RestoreMode RESTORE_MODE = RestoreMode.CLAIM;
+
+    static final Map<String, String> FLINK_CONFIGURATION =

Review Comment:
   Construct a `Configuration` and convert it to a map as needed instead.



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java:
##########
@@ -59,5 +63,11 @@ protected void assertOriginalEqualsToUnmarshalled(
                 .isEqualTo(expected.getAllowNonRestoredState());
         assertThat(actual.getSavepointPath()).isEqualTo(expected.getSavepointPath());
         assertThat(actual.getRestoreMode()).isEqualTo(expected.getRestoreMode());
+        assertThat(
+                        Maps.difference(
+                                        expected.getFlinkConfiguration().toMap(),
+                                        actual.getFlinkConfiguration().toMap())
+                                .areEqual())
+                .isEqualTo(true);

Review Comment:
   ```suggestion
           assertThat(actual.getFlinkConfiguration().toMap())
                   .containsExactlyEntriesOf(expected.getFlinkConfiguration().toMap());
   ```



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   > when we run JarRunHandler in the code, the above value will still appear in flinkconfig
   
   I don't get it. We are in full control in these tests as to whether the configuration is passed via the request body and what "flink configuration" is passed to the handler at construction time.
   Why would it _impossible_ to test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1011107976


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   Sure, I think unit tests should cover all requested values, including values ​​like: 
   
   - DEFAULT_PARALLELISM
   - RESTORE_MODE
   - SAVEPOINT_IGNORE_UNCLAIMED_STATE
   - SAVEPOINT_PATH
   
   what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1278668989

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r993456043


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java:
##########
@@ -90,7 +90,6 @@ static void setup(@TempDir File tempDir) throws Exception {
                 Files.copy(
                         jarLocation.resolve(parameterProgramWithEagerSink),
                         jarDir.resolve("program-with-eager-sink.jar"));
-

Review Comment:
   revert



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,7 +93,20 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        Map<String, String> requestJobFlinkConfig =
+                HandlerRequestUtils.fromRequestBodyOrQueryParameter(
+                        (request.getRequestBody()).getFlinkConfiguration(),
+                        () -> null,
+                        null,
+                        this.log);
+        Configuration effectiveConfiguration;
+        if (requestJobFlinkConfig != null) {
+            effectiveConfiguration = new Configuration();
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);

Review Comment:
   In this case we completely ignore `this.configuration` which doesn't correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1301845113

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1293560621

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1006553366


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();

Review Comment:
   Now I've added the logic of the JobPlanHandler part,not need to case it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1304503453

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1015372020


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   because both **JarPlanHandler** and **JarRunHandler** are tested, but in fact, JarPlanHandler has nothing to do with Savepoint settings, so I extracted a method here. If I need to improve this part of the test to make it simple, what should I do, this is where I'm confused 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1016183568


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   Now I do this with a new validation with JobGraph method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol merged PR #21012:
URL: https://github.com/apache/flink/pull/21012


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1011660253


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   I only insist on testing the parallelism setting; the rest is up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r995537283


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();

Review Comment:
   This isn't the case of the JarPlanHandler.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,7 +93,17 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        Map<String, String> requestJobFlinkConfig =
+                HandlerRequestUtils.fromRequestBodyOrQueryParameter(
+                        (request.getRequestBody()).getFlinkConfiguration(),
+                        () -> null,
+                        null,
+                        this.log);

Review Comment:
   ```suggestion
           Map<String, String> requestJobFlinkConfig = request.getRequestBody().getFlinkConfiguration();
   ```
   Since there's no query parameter it doesn't make sense to call this method.



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   Why is this specific to the jar run handler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1006552316


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();

Review Comment:
   Now I've added the logic of the JobPlanHandler part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1296123544

   @zentol PLAT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1003931113


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   Do I need to open a new issuse to complete this, or continue to complete this function on this issuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r996443211


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   becasue just jarRunHandler can set flinkConfig,this test is include JarPlanHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1306725733

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1016396225


##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -130,24 +130,33 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
 
         final JarRunRequestBody requestBody = request.getRequestBody();
 
+        Configuration effectiveConfiguration = requestBody.getFlinkConfiguration();

Review Comment:
   ```suggestion
           final Configuration configuration = requestBody.getFlinkConfiguration();
   ```
   Just renaming this to `effectiveConfiguration` doesn't achieve anything. It's misleading even because it's _not_ the effective configuration.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,12 +92,12 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        final Configuration effectiveConfiguration = new Configuration(this.configuration);
         effectiveConfiguration.set(DeploymentOptions.ATTACHED, false);
         effectiveConfiguration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
 
         final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
-        context.applyToConfiguration(effectiveConfiguration);
+        context.applyToConfiguration(effectiveConfiguration, request);
         SavepointRestoreSettings.toConfiguration(
                 getSavepointRestoreSettings(request), effectiveConfiguration);

Review Comment:
   This doesn't behave correctly.
   
   If a savepoint setting was not set in the request then we end up setting it to the default value, overwriting what was configured on the cluster.
   
   That's what I meant with "Why isn't this using the `effectiveConfiguration`?". The fallback setting you derive in `getSavepointRestoreSettings` (which is always used as the final setting) must in part be based also on this `effectiveConfiguration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1305007688

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r996454189


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();

Review Comment:
   JarPlanHandler not need to change,if I just want to test JarRunPlaner flinkConfig is ready, should I test in the test method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1000065071


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   Does that mean I should implement the part of the JobPlanHandler logic that parses flinkconfig, or should I rewrite a separate test where I only test JobRunPlanHandler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1000427810


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   You should also add the flinkConfig logic to the JobPlanHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1304771208

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1311133140

   @zentol PLAT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1293172070

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1006552851


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   Now I've added the logic of the JobPlanHandler part



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1009284356


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -69,6 +71,11 @@ enum ProgramArgsParType {
     static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"};
     static final int PARALLELISM = 4;
 
+    static final Map<String, String> FLINK_CONFIGURATION =
+            ImmutableMap.of(
+                    CoreOptions.DEFAULT_PARALLELISM.key(), "2",

Review Comment:
   needs a test for what happens if this isn't set in the config, since we have special logic for this case.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java:
##########
@@ -244,4 +245,15 @@ static List<String> tokenizeArguments(@Nullable final String args) {
         }
         return tokens;
     }
+
+    private static Integer getParallelism(JarRequestBody requestBody) {
+        Optional<Map<String, String>> optionalFlinkConfig =
+                Optional.ofNullable(requestBody.getFlinkConfiguration());
+        Optional<Integer> optionalParallelism = Optional.ofNullable(requestBody.getParallelism());
+        if (optionalParallelism.isPresent() || !optionalFlinkConfig.isPresent()) {
+            return requestBody.getParallelism();
+        }
+        return Integer.valueOf(
+                optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()));
+    }

Review Comment:
   This additionally needs a guard for the cases that the optionaFlinkConfig does not contain `DEFAULT_PARALLELISM`.
   This is probably easier to do if you construct a `Configuration`.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java:
##########
@@ -114,4 +123,10 @@ public Integer getParallelism() {
     public JobID getJobId() {
         return jobId;
     }
+
+    @Nullable
+    @JsonIgnore
+    public Map<String, String> getFlinkConfiguration() {
+        return flinkConfiguration;
+    }

Review Comment:
   Consider returning a non-nullable `Configuration`. This should simplify other operations.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java:
##########
@@ -38,6 +39,8 @@ public class JarRunRequestBody extends JarRequestBody {
     private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
     private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";
 
+    private static final String FIELD_NAME_FLINK_CONFIGURATION = "flinkConfiguration";

Review Comment:
   re-use name field from `JarRequestBody` instead



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,7 +92,13 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new Configuration(this.configuration);
+        if (requestJobFlinkConfig != null) {
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);
+        }

Review Comment:
   consider integrating this into `JarHandlerContext#applyToConfiguration`



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java:
##########
@@ -244,4 +245,15 @@ static List<String> tokenizeArguments(@Nullable final String args) {
         }
         return tokens;
     }
+
+    private static Integer getParallelism(JarRequestBody requestBody) {
+        Optional<Map<String, String>> optionalFlinkConfig =
+                Optional.ofNullable(requestBody.getFlinkConfiguration());
+        Optional<Integer> optionalParallelism = Optional.ofNullable(requestBody.getParallelism());
+        if (optionalParallelism.isPresent() || !optionalFlinkConfig.isPresent()) {
+            return requestBody.getParallelism();
+        }
+        return Integer.valueOf(
+                optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()));
+    }

Review Comment:
   ```suggestion
           return optionalParallelism.orElse(optionalFlinkConfig
                           .map(map -> Integer.valueOf(optionalFlinkConfig.get().get(CoreOptions.DEFAULT_PARALLELISM.key()))
                           .orElse(null));
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =

Review Comment:
   ```suggestion
           final Map<String, String> requestJobFlinkConfig =
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new Configuration(this.configuration);

Review Comment:
   ```suggestion
           final Configuration effectiveConfiguration = new Configuration(this.configuration);
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java:
##########
@@ -98,13 +98,18 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
         final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);
-
+        Map<String, String> requestJobFlinkConfig =
+                request.getRequestBody().getFlinkConfiguration();
+        Configuration effectiveConfiguration = new Configuration(this.configuration);
+        if (requestJobFlinkConfig != null) {
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);

Review Comment:
   ```suggestion        
               effectiveConfiguration.addAll(Configuration.fromMap(requestJobFlinkConfig));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1301753419

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1304737708

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on PR #21012:
URL: https://github.com/apache/flink/pull/21012#issuecomment-1310332504

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r995518693


##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java:
##########
@@ -92,7 +93,20 @@ public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
 
-        final Configuration effectiveConfiguration = new Configuration(configuration);
+        Map<String, String> requestJobFlinkConfig =
+                HandlerRequestUtils.fromRequestBodyOrQueryParameter(
+                        (request.getRequestBody()).getFlinkConfiguration(),
+                        () -> null,
+                        null,
+                        this.log);
+        Configuration effectiveConfiguration;
+        if (requestJobFlinkConfig != null) {
+            effectiveConfiguration = new Configuration();
+            requestJobFlinkConfig.forEach(effectiveConfiguration::setString);

Review Comment:
   I am now changing it to
   `Configuration effectiveConfiguration = new Configuration(this.configuration);`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r996443211


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   becasue just jarRunHandler can set flinkConfig,this test is include jobPlanhandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r996925478


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),

Review Comment:
   ```suggestion
                           jarRequestBodyWithFlinkConfig,
   ```



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));
+        if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   It is actually a bit questionable to have this only work for the JarRunHandler; the 2 handlers should generally have the same capabilities; the whole point of the JarPlanHandler is to get a preview of the job after all.



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##########
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
         assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
     }
 
+    @Test
+    void testProvideFlinkConfig() throws Exception {
+
+        REQB jarRequestBodyWithFlinkConfig = getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+        HandlerRequest<REQB> request =
+                createRequest(
+                        getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest);
+
+        handleRequest(request);
+
+        Optional<JobGraph> jobGraph = getLastSubmittedJobGraphAndReset();
+
+        assertThat(jobGraph.isPresent()).isTrue();
+        JobGraph graph = jobGraph.get();
+
+        assertThat(getExecutionConfig(graph).getParallelism())
+                .isNotEqualTo(
+                        Integer.valueOf(
+                                FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key())));

Review Comment:
   I don't understand why this assertion is passing. For the JarPlanHandler, sure, because it doesn't support the configuration, but why isn't this setting honored by the JarRunHandler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org