You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/16 06:50:09 UTC

[flink-statefun] 03/03: [FLINK-16254] [e2e] Adapt StatefulFunctionsAppContainers to use the -p command option

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 05888397d049f23c9bc960a667d8350570c5f95c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Sat Mar 14 14:25:25 2020 +0800

    [FLINK-16254] [e2e] Adapt StatefulFunctionsAppContainers to use the -p command option
    
    This closes #58.
---
 .../statefun/e2e/common/StatefulFunctionsAppContainers.java   | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 5d1cc87..f648fcd 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -31,7 +31,6 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.junit.rules.ExternalResource;
@@ -142,10 +141,6 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     this.network = Network.newNetwork();
     this.appName = appName;
     this.numWorkers = numWorkers;
-
-    if (numWorkers > 1) {
-      dynamicProperties.set(CoreOptions.DEFAULT_PARALLELISM, numWorkers);
-    }
   }
 
   public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
@@ -180,7 +175,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   protected void before() throws Throwable {
     final ImageFromDockerfile appImage =
         appImage(appName, dynamicProperties, classpathBuildContextFiles);
-    this.master = masterContainer(appImage, network, dependentContainers, masterLogger);
+    this.master = masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger);
     this.workers = workerContainers(appImage, numWorkers, network);
 
     master.start();
@@ -261,13 +256,15 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
       ImageFromDockerfile appImage,
       Network network,
       List<GenericContainer<?>> dependents,
+      int numWorkers,
       @Nullable Logger masterLogger) {
     final GenericContainer<?> master =
         new GenericContainer(appImage)
             .withNetwork(network)
             .withNetworkAliases(MASTER_HOST)
             .withEnv("ROLE", "master")
-            .withEnv("MASTER_HOST", MASTER_HOST);
+            .withEnv("MASTER_HOST", MASTER_HOST)
+            .withCommand("-p " + numWorkers);
 
     for (GenericContainer<?> dependent : dependents) {
       master.dependsOn(dependent);