You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/07/26 09:18:11 UTC
[flink] branch master updated: [FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c4b107835c8 [FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend
c4b107835c8 is described below
commit c4b107835c8bdae0667efcbdfe52aa3a34aec894
Author: Paul Zhang <xz...@126.com>
AuthorDate: Thu May 19 16:07:07 2022 +0800
[FLINK-27579][client] Make the client.timeout and parallelism.default could take effect in CLI frontend
This closes #19772.
---
.../org/apache/flink/client/cli/CliFrontend.java | 84 +++++++++-----
.../cli/CliFrontendDynamicPropertiesTest.java | 122 +++++++++++++++------
2 files changed, 143 insertions(+), 63 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index caa5f8f242e..652df51c292 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -110,10 +110,6 @@ public class CliFrontend {
private final Options customCommandLineOptions;
- private final Duration clientTimeout;
-
- private final int defaultParallelism;
-
private final ClusterClientServiceLoader clusterClientServiceLoader;
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) {
@@ -137,9 +133,6 @@ public class CliFrontend {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
-
- this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
- this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
// --------------------------------------------------------------------------------------------
@@ -340,11 +333,6 @@ public class CliFrontend {
PackagedProgram program = null;
try {
- int parallelism = programOptions.getParallelism();
- if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
- parallelism = defaultParallelism;
- }
-
LOG.info("Creating program plan dump");
final CustomCommandLine activeCommandLine =
@@ -359,6 +347,11 @@ public class CliFrontend {
program = buildProgram(programOptions, effectiveConfiguration);
+ int parallelism = programOptions.getParallelism();
+ if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
+ parallelism = getDefaultParallelism(effectiveConfiguration);
+ }
+
Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(
program, effectiveConfiguration, parallelism, true);
@@ -428,7 +421,8 @@ public class CliFrontend {
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient -> listJobs(clusterClient, showRunning, showScheduled, showAll));
+ (clusterClient, effectiveConfiguration) ->
+ listJobs(clusterClient, showRunning, showScheduled, showAll));
}
private <ClusterID> void listJobs(
@@ -570,10 +564,11 @@ public class CliFrontend {
+ " savepoint.");
final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine);
+
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient -> {
+ (clusterClient, effectiveConfiguration) -> {
final String savepointPath;
try {
savepointPath =
@@ -583,7 +578,9 @@ public class CliFrontend {
advanceToEndOfEventTime,
targetDirectory,
formatType)
- .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ .get(
+ getClientTimeout(effectiveConfiguration).toMillis(),
+ TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException(
"Could not stop with a savepoint job \"" + jobId + "\".", e);
@@ -653,13 +650,16 @@ public class CliFrontend {
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient -> {
+ (clusterClient, effectiveConfiguration) -> {
final String savepointPath;
try {
savepointPath =
clusterClient
.cancelWithSavepoint(jobId, targetDirectory, formatType)
- .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ .get(
+ getClientTimeout(effectiveConfiguration)
+ .toMillis(),
+ TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException("Could not cancel job " + jobId + '.', e);
}
@@ -684,11 +684,13 @@ public class CliFrontend {
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient -> {
+ (clusterClient, effectiveConfiguration) -> {
try {
clusterClient
.cancel(jobId)
- .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ .get(
+ getClientTimeout(effectiveConfiguration).toMillis(),
+ TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new FlinkException("Could not cancel job " + jobId + '.', e);
}
@@ -735,8 +737,11 @@ public class CliFrontend {
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient ->
- disposeSavepoint(clusterClient, savepointOptions.getSavepointPath()));
+ (clusterClient, effectiveConfiguration) ->
+ disposeSavepoint(
+ clusterClient,
+ savepointOptions.getSavepointPath(),
+ getClientTimeout(effectiveConfiguration)));
} else {
String[] cleanedArgs = savepointOptions.getArgs();
@@ -767,12 +772,13 @@ public class CliFrontend {
runClusterAction(
activeCommandLine,
commandLine,
- clusterClient ->
+ (clusterClient, effectiveConfiguration) ->
triggerSavepoint(
clusterClient,
jobId,
savepointDirectory,
- savepointOptions.getFormatType()));
+ savepointOptions.getFormatType(),
+ getClientTimeout(effectiveConfiguration)));
}
}
@@ -781,7 +787,8 @@ public class CliFrontend {
ClusterClient<?> clusterClient,
JobID jobId,
String savepointDirectory,
- SavepointFormatType formatType)
+ SavepointFormatType formatType,
+ Duration clientTimeout)
throws FlinkException {
logAndSysout("Triggering savepoint for job " + jobId + '.');
@@ -804,7 +811,8 @@ public class CliFrontend {
}
/** Sends a SavepointDisposalRequest to the job manager. */
- private void disposeSavepoint(ClusterClient<?> clusterClient, String savepointPath)
+ private void disposeSavepoint(
+ ClusterClient<?> clusterClient, String savepointPath, Duration clientTimeout)
throws FlinkException {
checkNotNull(
savepointPath,
@@ -1023,7 +1031,7 @@ public class CliFrontend {
clusterClientFactory.createClusterDescriptor(effectiveConfiguration)) {
try (final ClusterClient<ClusterID> clusterClient =
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
- clusterAction.runAction(clusterClient);
+ clusterAction.runAction(clusterClient, effectiveConfiguration);
}
}
}
@@ -1041,9 +1049,11 @@ public class CliFrontend {
* Run the cluster action with the given {@link ClusterClient}.
*
* @param clusterClient to run the cluster action against
+ * @param effectiveConfiguration Flink effective configuration
* @throws FlinkException if something goes wrong
*/
- void runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException;
+ void runAction(ClusterClient<ClusterID> clusterClient, Configuration effectiveConfiguration)
+ throws FlinkException;
}
// --------------------------------------------------------------------------------------------
@@ -1288,4 +1298,24 @@ public class CliFrontend {
return constructor.newInstance(params);
}
+
+ /**
+ * Get client timeout from command line via effective configuration.
+ *
+ * @param effectiveConfiguration Flink effective configuration.
+ * @return client timeout with Duration type
+ */
+ private Duration getClientTimeout(Configuration effectiveConfiguration) {
+ return effectiveConfiguration.get(ClientOptions.CLIENT_TIMEOUT);
+ }
+
+ /**
+ * Get default parallelism from command line via effective configuration.
+ *
+ * @param effectiveConfiguration Flink effective configuration.
+ * @return default parallelism.
+ */
+ private int getDefaultParallelism(Configuration effectiveConfiguration) {
+ return effectiveConfiguration.get(CoreOptions.DEFAULT_PARALLELISM);
+ }
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
index 374eb8f8b60..28a07e3e89a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendDynamicPropertiesTest.java
@@ -33,8 +33,9 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
-import static org.apache.flink.client.cli.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
import static org.junit.Assert.assertEquals;
@@ -83,12 +84,19 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
"arg2"
};
- verifyCliFrontend(
+ Map<String, String> expectedConfigValues = new HashMap<>();
+ expectedConfigValues.put("parallelism.default", "5");
+ expectedConfigValues.put("classloader.resolve-order", "parent-first");
+ verifyCliFrontendWithDynamicProperties(
configuration,
args,
cliUnderTest,
- "parent-first",
- ParentFirstClassLoader.class.getName());
+ expectedConfigValues,
+ (configuration, program) -> {
+ assertEquals(
+ ParentFirstClassLoader.class.getName(),
+ program.getUserCodeClassLoader().getClass().getName());
+ });
}
@Test
@@ -106,12 +114,18 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
"arg2"
};
- verifyCliFrontend(
+ Map<String, String> expectedConfigValues = new HashMap<>();
+ expectedConfigValues.put("parallelism.default", "5");
+ verifyCliFrontendWithDynamicProperties(
configuration,
args,
cliUnderTest,
- "child-first",
- ChildFirstClassLoader.class.getName());
+ expectedConfigValues,
+ (configuration, program) -> {
+ assertEquals(
+ ChildFirstClassLoader.class.getName(),
+ program.getUserCodeClassLoader().getClass().getName());
+ });
}
@Test
@@ -130,56 +144,92 @@ public class CliFrontendDynamicPropertiesTest extends CliFrontendTestBase {
"arg2"
};
- verifyCliFrontend(
+ Map<String, String> expectedConfigValues = new HashMap<>();
+ expectedConfigValues.put("parallelism.default", "5");
+ expectedConfigValues.put("classloader.resolve-order", "child-first");
+ verifyCliFrontendWithDynamicProperties(
configuration,
args,
cliUnderTest,
- "child-first",
- ChildFirstClassLoader.class.getName());
+ expectedConfigValues,
+ (configuration, program) -> {
+ assertEquals(
+ ChildFirstClassLoader.class.getName(),
+ program.getUserCodeClassLoader().getClass().getName());
+ });
+ }
+
+ @Test
+ public void testDynamicPropertiesWithClientTimeoutAndDefaultParallelism() throws Exception {
+
+ String[] args = {
+ "-e",
+ "test-executor",
+ "-Dclient.timeout=10min",
+ "-Dparallelism.default=12",
+ getTestJarPath(),
+ };
+ Map<String, String> expectedConfigValues = new HashMap<>();
+ expectedConfigValues.put("client.timeout", "10min");
+ expectedConfigValues.put("parallelism.default", "12");
+ verifyCliFrontendWithDynamicProperties(
+ configuration, args, cliUnderTest, expectedConfigValues);
}
// --------------------------------------------------------------------------------------------
- public static void verifyCliFrontend(
+ public static void verifyCliFrontendWithDynamicProperties(
Configuration configuration,
String[] parameters,
GenericCLI cliUnderTest,
- String expectedResolveOrderOption,
- String userCodeClassLoaderClassName)
+ Map<String, String> expectedConfigValues)
throws Exception {
- TestingCliFrontend testFrontend =
- new TestingCliFrontend(
- configuration,
- cliUnderTest,
- expectedResolveOrderOption,
- userCodeClassLoaderClassName);
- testFrontend.run(parameters); // verifies the expected values (see below)
+ verifyCliFrontendWithDynamicProperties(
+ configuration, parameters, cliUnderTest, expectedConfigValues, null);
}
- private static final class TestingCliFrontend extends CliFrontend {
+ public static void verifyCliFrontendWithDynamicProperties(
+ Configuration configuration,
+ String[] parameters,
+ GenericCLI cliUnderTest,
+ Map<String, String> expectedConfigValues,
+ TestingCliFrontendWithDynamicProperties.CustomTester customTester)
+ throws Exception {
+ TestingCliFrontendWithDynamicProperties testFrontend =
+ new TestingCliFrontendWithDynamicProperties(
+ configuration, cliUnderTest, expectedConfigValues, customTester);
+ testFrontend.run(parameters); // verifies the expected values (see below)
+ }
- private final String expectedResolveOrder;
+ private static final class TestingCliFrontendWithDynamicProperties extends CliFrontend {
+ private final Map<String, String> expectedConfigValues;
- private final String userCodeClassLoaderClassName;
+ private final CustomTester tester;
- private TestingCliFrontend(
+ private TestingCliFrontendWithDynamicProperties(
Configuration configuration,
- GenericCLI cliUnderTest,
- String expectedResolveOrderOption,
- String userCodeClassLoaderClassName) {
- super(configuration, Collections.singletonList(cliUnderTest));
- this.expectedResolveOrder = expectedResolveOrderOption;
- this.userCodeClassLoaderClassName = userCodeClassLoaderClassName;
+ GenericCLI cli,
+ Map<String, String> expectedConfigValues,
+ CustomTester customTester) {
+ super(configuration, Collections.singletonList(cli));
+ this.expectedConfigValues = expectedConfigValues;
+ this.tester = customTester;
+ }
+
+ @FunctionalInterface
+ private interface CustomTester {
+ void test(Configuration configuration, PackagedProgram program);
}
@Override
protected void executeProgram(Configuration configuration, PackagedProgram program) {
- assertEquals(TEST_JAR_MAIN_CLASS, program.getMainClassName());
- assertEquals(
- expectedResolveOrder, configuration.get(CoreOptions.CLASSLOADER_RESOLVE_ORDER));
- assertEquals(
- userCodeClassLoaderClassName,
- program.getUserCodeClassLoader().getClass().getName());
+ expectedConfigValues.forEach(
+ (key, value) -> {
+ assertEquals(configuration.toMap().get(key), value);
+ });
+ if (tester != null) {
+ tester.test(configuration, program);
+ }
}
}
}