You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/16 06:50:07 UTC

[flink-statefun] 01/03: [FLINK-16254] Add parallelism to StatefulFunctionsClusterConfiguration

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e4f847ef9a060f0caf87ee20fec02f1df4dbfec9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 15:41:31 2020 +0800

    [FLINK-16254] Add parallelism to StatefulFunctionsClusterConfiguration
    
    This commit adds parallelism value to
    StatefulFunctionsClusterConfiguration, parsed from the command line.
---
 .../StatefulFunctionsClusterConfiguration.java     | 10 +++++++++-
 ...FunctionsClusterConfigurationParserFactory.java | 23 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
index d2ff55f..0afcfea 100644
--- a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
+++ b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfiguration.java
@@ -37,6 +37,8 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
 
   @Nullable private final JobID jobId;
 
+  private final int parallelism;
+
   StatefulFunctionsClusterConfiguration(
       @Nonnull String configDir,
       @Nonnull Properties dynamicProperties,
@@ -44,11 +46,13 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
       @Nullable String hostname,
       int restPort,
       @Nonnull SavepointRestoreSettings savepointRestoreSettings,
-      @Nullable JobID jobId) {
+      @Nullable JobID jobId,
+      int parallelism) {
     super(configDir, dynamicProperties, args, hostname, restPort);
     this.savepointRestoreSettings =
         requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
     this.jobId = jobId;
+    this.parallelism = parallelism;
   }
 
   @Nonnull
@@ -60,4 +64,8 @@ final class StatefulFunctionsClusterConfiguration extends EntrypointClusterConfi
   JobID getJobId() {
     return jobId;
   }
+
+  public int getParallelism() {
+    return parallelism;
+  }
 }
diff --git a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
index da6cbfb..6b00f4c 100644
--- a/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
+++ b/statefun-flink/statefun-flink-launcher/src/main/java/org/apache/flink/statefun/flink/launcher/StatefulFunctionsClusterConfigurationParserFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.statefun.flink.launcher;
 
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
@@ -28,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -77,6 +79,7 @@ public class StatefulFunctionsClusterConfigurationParserFactory
     options.addOption(REST_PORT_OPTION);
     options.addOption(JOB_ID_OPTION);
     options.addOption(DYNAMIC_PROPERTY_OPTION);
+    options.addOption(PARALLELISM_OPTION);
     options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
     options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
 
@@ -91,6 +94,7 @@ public class StatefulFunctionsClusterConfigurationParserFactory
         commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
     final int restPort = getRestPort(commandLine);
     final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
+    final int parallelism = getParallelism(commandLine);
     final SavepointRestoreSettings savepointRestoreSettings =
         CliFrontendParser.createSavepointRestoreSettings(commandLine);
     final JobID jobId = getJobId(commandLine);
@@ -102,7 +106,8 @@ public class StatefulFunctionsClusterConfigurationParserFactory
         hostname,
         restPort,
         savepointRestoreSettings,
-        jobId);
+        jobId,
+        parallelism);
   }
 
   private int getRestPort(CommandLine commandLine) throws FlinkParseException {
@@ -113,4 +118,20 @@ public class StatefulFunctionsClusterConfigurationParserFactory
       throw createFlinkParseException(REST_PORT_OPTION, e);
     }
   }
+
+  private int getParallelism(CommandLine commandLine) throws FlinkParseException {
+    final String parallelismString =
+        commandLine.getOptionValue(
+            PARALLELISM_OPTION.getOpt(), String.valueOf(ExecutionConfig.PARALLELISM_DEFAULT));
+    try {
+      int parallelism = Integer.parseInt(parallelismString);
+      if (parallelism <= 0 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
+        throw new IllegalArgumentException(
+            "Parallelism must be at least 1. Provided: " + parallelism);
+      }
+      return parallelism;
+    } catch (NumberFormatException e) {
+      throw createFlinkParseException(PARALLELISM_OPTION, e);
+    }
+  }
 }