You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/16 06:50:07 UTC
[flink-statefun] 01/03: [FLINK-16254] Add parallelism to
StatefulFunctionsClusterConfiguration
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit e4f847ef9a060f0caf87ee20fec02f1df4dbfec9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 15:41:31 2020 +0800
[FLINK-16254] Add parallelism to StatefulFunctionsClusterConfiguration
This commit adds parallelism value to
StatefulFunctionsClusterConfiguration, parsed from the command line.
---
.../StatefulFunctionsClusterConfiguration.java | 10 +++++++++-
...FunctionsClusterConfigurationParserFactory.java | 23 +++++++++++++++++++++-
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
index d2ff55f..0afcfea 100644
--- a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
+++ b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
@@ -37,6 +37,8 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
@Nullable private final JobID jobId;
+ private final int parallelism;
+
StatefulFunctionsClusterConfiguration(
@Nonnull String configDir,
@Nonnull Properties dynamicProperties,
@@ -44,11 +46,13 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
@Nullable String hostname,
int restPort,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
- @Nullable JobID jobId) {
+ @Nullable JobID jobId,
+ int parallelism) {
super(configDir, dynamicProperties, args, hostname, restPort);
this.savepointRestoreSettings =
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
this.jobId = jobId;
+ this.parallelism = parallelism;
}
@Nonnull
@@ -60,4 +64,8 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
JobID getJobId() {
return jobId;
}
+
+ public int getParallelism() {
+ return parallelism;
+ }
}
diff --git a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
index da6cbfb..6b00f4c 100644
--- a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
+++ b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.statefun.flink.launcher;
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
@@ -28,6 +29,7 @@ import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -77,6 +79,7 @@ public class StatefulFunctionsClusterConfigurationParserFactory
options.addOption(REST_PORT_OPTION);
options.addOption(JOB_ID_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
+ options.addOption(PARALLELISM_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
@@ -91,6 +94,7 @@ public class StatefulFunctionsClusterConfigurationParserFactory
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final int restPort = getRestPort(commandLine);
final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
+ final int parallelism = getParallelism(commandLine);
final SavepointRestoreSettings savepointRestoreSettings =
CliFrontendParser.createSavepointRestoreSettings(commandLine);
final JobID jobId = getJobId(commandLine);
@@ -102,7 +106,8 @@ public class StatefulFunctionsClusterConfigurationParserFactory
hostname,
restPort,
savepointRestoreSettings,
- jobId);
+ jobId,
+ parallelism);
}
private int getRestPort(CommandLine commandLine) throws FlinkParseException {
@@ -113,4 +118,20 @@ public class StatefulFunctionsClusterConfigurationParserFactory
throw createFlinkParseException(REST_PORT_OPTION, e);
}
}
+
+ private int getParallelism(CommandLine commandLine) throws FlinkParseException {
+ final String parallelismString =
+ commandLine.getOptionValue(
+ PARALLELISM_OPTION.getOpt(), String.valueOf(ExecutionConfig.PARALLELISM_DEFAULT));
+ try {
+ int parallelism = Integer.parseInt(parallelismString);
+ if (parallelism <= 0 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
+ throw new IllegalArgumentException(
+ "Parallelism must be at least 1. Provided: " + parallelism);
+ }
+ return parallelism;
+ } catch (NumberFormatException e) {
+ throw createFlinkParseException(PARALLELISM_OPTION, e);
+ }
+ }
}