You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/07/26 09:18:11 UTC

[flink] branch master updated: [FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b107835c8 [FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend
c4b107835c8 is described below

commit c4b107835c8bdae0667efcbdfe52aa3a34aec894
Author: Paul Zhang <xz...@126.com>
AuthorDate: Thu May 19 16:07:07 2022 +0800

    [FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend
    
    This closes #19772.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  84 +++++++++-----
 .../cli/CliFrontendDynamicPropertiesTest.java      | 122 +++++++++++++++------
 2 files changed, 143 insertions(+), 63 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index caa5f8f242e..652df51c292 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -110,10 +110,6 @@ public class CliFrontend {
 
     private final Options customCommandLineOptions;
 
-    private final Duration clientTimeout;
-
-    private final int defaultParallelism;
-
     private final ClusterClientServiceLoader clusterClientServiceLoader;
 
     public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
@@ -137,9 +133,6 @@ public class CliFrontend {
             customCommandLine.addGeneralOptions(customCommandLineOptions);
             customCommandLine.addRunOptions(customCommandLineOptions);
         }
-
-        this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
-        this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -340,11 +333,6 @@ public class CliFrontend {
         PackagedProgram program = null;
 
         try {
-            int parallelism = programOptions.getParallelism();
-            if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
-                parallelism = defaultParallelism;
-            }
-
             LOG.info("Creating program plan dump");
 
             final CustomCommandLine activeCommandLine =
@@ -359,6 +347,11 @@ public class CliFrontend {
 
             program = buildProgram(programOptions, effectiveConfiguration);
 
+            int parallelism = programOptions.getParallelism();
+            if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
+                parallelism = getDefaultParallelism(effectiveConfiguration);
+            }
+
             Pipeline pipeline =
                     PackagedProgramUtils.getPipelineFromProgram(
                             program, effectiveConfiguration, parallelism, true);
@@ -428,7 +421,8 @@ public class CliFrontend {
         runClusterAction(
                 activeCommandLine,
                 commandLine,
-                clusterClient -> listJobs(clusterClient, showRunning, showScheduled, showAll));
+                (clusterClient, effectiveConfiguration) ->
+                        listJobs(clusterClient, showRunning, showScheduled, showAll));
     }
 
     private <ClusterID> void listJobs(
@@ -570,10 +564,11 @@ public class CliFrontend {
                         + " savepoint.");
 
         final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
+
         runClusterAction(
                 activeCommandLine,
                 commandLine,
-                clusterClient -> {
+                (clusterClient, effectiveConfiguration) -> {
                     final String savepointPath;
                     try {
                         savepointPath =
@@ -583,7 +578,9 @@ public class CliFrontend {
                                                 advanceToEndOfEventTime,
                                                 targetDirectory,
                                                 formatType)
-                                        .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+                                        .get(
+                                                getClientTimeout(effectiveConfiguration).toMillis(),
+                                                TimeUnit.MILLISECONDS);
                     } catch (Exception e) {
                         throw new FlinkException(
                                 "Could not stop with a savepoint job \"" + jobId + "\".", e);
@@ -653,13 +650,16 @@ public class CliFrontend {
             runClusterAction(
                     activeCommandLine,
                     commandLine,
-                    clusterClient -> {
+                    (clusterClient, effectiveConfiguration) -> {
                         final String savepointPath;
                         try {
                             savepointPath =
                                     clusterClient
                                             .cancelWithSavepoint(jobId, targetDirectory, formatType)
-                                            .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+                                            .get(
+                                                    getClientTimeout(effectiveConfiguration)
+                                                            .toMillis(),
+                                                    TimeUnit.MILLISECONDS);
                         } catch (Exception e) {
                             throw new FlinkException("Could not cancel job " + jobId + '.', e);
                         }
@@ -684,11 +684,13 @@ public class CliFrontend {
             runClusterAction(
                     activeCommandLine,
                     commandLine,
-                    clusterClient -> {
+                    (clusterClient, effectiveConfiguration) -> {
                         try {
                             clusterClient
                                     .cancel(jobId)
-                                    .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+                                    .get(
+                                            getClientTimeout(effectiveConfiguration).toMillis(),
+                                            TimeUnit.MILLISECONDS);
                         } catch (Exception e) {
                             throw new FlinkException("Could not cancel job " + jobId + '.', e);
                         }
@@ -735,8 +737,11 @@ public class CliFrontend {
             runClusterAction(
                     activeCommandLine,
                     commandLine,
-                    clusterClient ->
-                            disposeSavepoint(clusterClient, savepointOptions.getSavepointPath()));
+                    (clusterClient, effectiveConfiguration) ->
+                            disposeSavepoint(
+                                    clusterClient,
+                                    savepointOptions.getSavepointPath(),
+                                    getClientTimeout(effectiveConfiguration)));
         } else {
             String[] cleanedArgs = savepointOptions.getArgs();
 
@@ -767,12 +772,13 @@ public class CliFrontend {
             runClusterAction(
                     activeCommandLine,
                     commandLine,
-                    clusterClient ->
+                    (clusterClient, effectiveConfiguration) ->
                             triggerSavepoint(
                                     clusterClient,
                                     jobId,
                                     savepointDirectory,
-                                    savepointOptions.getFormatType()));
+                                    savepointOptions.getFormatType(),
+                                    getClientTimeout(effectiveConfiguration)));
         }
     }
 
@@ -781,7 +787,8 @@ public class CliFrontend {
             ClusterClient<?> clusterClient,
             JobID jobId,
             String savepointDirectory,
-            SavepointFormatType formatType)
+            SavepointFormatType formatType,
+            Duration clientTimeout)
             throws FlinkException {
         logAndSysout("Triggering savepoint for job " + jobId + '.');
 
@@ -804,7 +811,8 @@ public class CliFrontend {
     }
 
     /** Sends a SavepointDisposalRequest to the job manager. */
-    private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath)
+    private void disposeSavepoint(
+            ClusterClient<?> clusterClient, String savepointPath, Duration clientTimeout)
             throws FlinkException {
         checkNotNull(
                 savepointPath,
@@ -1023,7 +1031,7 @@ public class CliFrontend {
                 clusterClientFactory.createClusterDescriptor(effectiveConfiguration)) {
             try (final ClusterClient<ClusterID> clusterClient =
                     clusterDescriptor.retrieve(clusterId).getClusterClient()) {
-                clusterAction.runAction(clusterClient);
+                clusterAction.runAction(clusterClient, effectiveConfiguration);
             }
         }
     }
@@ -1041,9 +1049,11 @@ public class CliFrontend {
          * Run the cluster action with the given {@link ClusterClient}.
          *
          * @param clusterClient to run the cluster action against
+         * @param effectiveConfiguration Flink effective configuration
          * @throws FlinkException if something goes wrong
          */
-        void runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException;
+        void runAction(ClusterClient<ClusterID> clusterClient, Configuration effectiveConfiguration)
+                throws FlinkException;
     }
 
     // --------------------------------------------------------------------------------------------
@@ -1288,4 +1298,24 @@ public class CliFrontend {
 
         return constructor.newInstance(params);
     }
+
+    /**
+     * Get client timeout from command line via effective configuration.
+     *
+     * @param effectiveConfiguration Flink effective configuration.
+     * @return client timeout with Duration type
+     */
+    private Duration getClientTimeout(Configuration effectiveConfiguration) {
+        return effectiveConfiguration.get(ClientOptions.CLIENT_TIMEOUT);
+    }
+
+    /**
+     * Get default parallelism from command line via effective configuration.
+     *
+     * @param effectiveConfiguration Flink effective configuration.
+     * @return default parallelism.
+     */
+    private int getDefaultParallelism(Configuration effectiveConfiguration) {
+        return effectiveConfiguration.get(CoreOptions.DEFAULT_PARALLELISM);
+    }
 }
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
index 374eb8f8b60..28a07e3e89a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
@@ -33,8 +33,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
 import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
 import static org.junit.Assert.assertEquals;
 
@@ -83,12 +84,19 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
             "arg2"
         };
 
-        verifyCliFrontend(
+        Map<String, String> expectedConfigValues = new HashMap<>();
+        expectedConfigValues.put("parallelism.default", "5");
+        expectedConfigValues.put("classloader.resolve-order", "parent-first");
+        verifyCliFrontendWithDynamicProperties(
                 configuration,
                 args,
                 cliUnderTest,
-                "parent-first",
-                ParentFirstClassLoader.class.getName());
+                expectedConfigValues,
+                (configuration, program) -> {
+                    assertEquals(
+                            ParentFirstClassLoader.class.getName(),
+                            program.getUserCodeClassLoader().getClass().getName());
+                });
     }
 
     @Test
@@ -106,12 +114,18 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
             "arg2"
         };
 
-        verifyCliFrontend(
+        Map<String, String> expectedConfigValues = new HashMap<>();
+        expectedConfigValues.put("parallelism.default", "5");
+        verifyCliFrontendWithDynamicProperties(
                 configuration,
                 args,
                 cliUnderTest,
-                "child-first",
-                ChildFirstClassLoader.class.getName());
+                expectedConfigValues,
+                (configuration, program) -> {
+                    assertEquals(
+                            ChildFirstClassLoader.class.getName(),
+                            program.getUserCodeClassLoader().getClass().getName());
+                });
     }
 
     @Test
@@ -130,56 +144,92 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
             "arg2"
         };
 
-        verifyCliFrontend(
+        Map<String, String> expectedConfigValues = new HashMap<>();
+        expectedConfigValues.put("parallelism.default", "5");
+        expectedConfigValues.put("classloader.resolve-order", "child-first");
+        verifyCliFrontendWithDynamicProperties(
                 configuration,
                 args,
                 cliUnderTest,
-                "child-first",
-                ChildFirstClassLoader.class.getName());
+                expectedConfigValues,
+                (configuration, program) -> {
+                    assertEquals(
+                            ChildFirstClassLoader.class.getName(),
+                            program.getUserCodeClassLoader().getClass().getName());
+                });
+    }
+
+    @Test
+    public void testDynamicPropertiesWithClientTimeoutAndDefaultParallelism() throws Exception {
+
+        String[] args = {
+            "-e",
+            "test-executor",
+            "-Dclient.timeout=10min",
+            "-Dparallelism.default=12",
+            getTestJarPath(),
+        };
+        Map<String, String> expectedConfigValues = new HashMap<>();
+        expectedConfigValues.put("client.timeout", "10min");
+        expectedConfigValues.put("parallelism.default", "12");
+        verifyCliFrontendWithDynamicProperties(
+                configuration, args, cliUnderTest, expectedConfigValues);
     }
 
     // --------------------------------------------------------------------------------------------
 
-    public static void verifyCliFrontend(
+    public static void verifyCliFrontendWithDynamicProperties(
             Configuration configuration,
             String[] parameters,
             GenericCLI cliUnderTest,
-            String expectedResolveOrderOption,
-            String userCodeClassLoaderClassName)
+            Map<String, String> expectedConfigValues)
             throws Exception {
-        TestingCliFrontend testFrontend =
-                new TestingCliFrontend(
-                        configuration,
-                        cliUnderTest,
-                        expectedResolveOrderOption,
-                        userCodeClassLoaderClassName);
-        testFrontend.run(parameters); // verifies the expected values (see below)
+        verifyCliFrontendWithDynamicProperties(
+                configuration, parameters, cliUnderTest, expectedConfigValues, null);
     }
 
-    private static final class TestingCliFrontend extends CliFrontend {
+    public static void verifyCliFrontendWithDynamicProperties(
+            Configuration configuration,
+            String[] parameters,
+            GenericCLI cliUnderTest,
+            Map<String, String> expectedConfigValues,
+            TestingCliFrontendWithDynamicProperties.CustomTester customTester)
+            throws Exception {
+        TestingCliFrontendWithDynamicProperties testFrontend =
+                new TestingCliFrontendWithDynamicProperties(
+                        configuration, cliUnderTest, expectedConfigValues, customTester);
+        testFrontend.run(parameters); // verifies the expected values (see below)
+    }
 
-        private final String expectedResolveOrder;
+    private static final class TestingCliFrontendWithDynamicProperties extends CliFrontend {
+        private final Map<String, String> expectedConfigValues;
 
-        private final String userCodeClassLoaderClassName;
+        private final CustomTester tester;
 
-        private TestingCliFrontend(
+        private TestingCliFrontendWithDynamicProperties(
                 Configuration configuration,
-                GenericCLI cliUnderTest,
-                String expectedResolveOrderOption,
-                String userCodeClassLoaderClassName) {
-            super(configuration, Collections.singletonList(cliUnderTest));
-            this.expectedResolveOrder = expectedResolveOrderOption;
-            this.userCodeClassLoaderClassName = userCodeClassLoaderClassName;
+                GenericCLI cli,
+                Map<String, String> expectedConfigValues,
+                CustomTester customTester) {
+            super(configuration, Collections.singletonList(cli));
+            this.expectedConfigValues = expectedConfigValues;
+            this.tester = customTester;
+        }
+
+        @FunctionalInterface
+        private interface CustomTester {
+            void test(Configuration configuration, PackagedProgram program);
         }
 
         @Override
         protected void executeProgram(Configuration configuration, PackagedProgram program) {
-            assertEquals(TEST_JAR_MAIN_CLASS, program.getMainClassName());
-            assertEquals(
-                    expectedResolveOrder, configuration.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER));
-            assertEquals(
-                    userCodeClassLoaderClassName,
-                    program.getUserCodeClassLoader().getClass().getName());
+            expectedConfigValues.forEach(
+                    (key, value) -> {
+                        assertEquals(configuration.toMap().get(key), value);
+                    });
+            if (tester != null) {
+                tester.test(configuration, program);
+            }
         }
     }
 }