You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/29 03:51:09 UTC

[flink-statefun] branch master updated (1a31c52 -> e2305e8)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 1a31c52  [FLINK-16321] Remove HttpFunction in favor of RequestReplyFunction
     new 14dc985  [FLINK-16274] [e2e] Make container construction lazy in StatefulFunctionsAppContainers
     new f61faac  [hotfix] POM for statefun-flink should inherit protobuf.version property
     new 5c82fe7  [FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings
     new dbb7165  [FLINK-16274] [e2e] Use new dynamic configuration methods in SanityVerificationITCase
     new e2305e8  [FLINK-16274] Consolidate constants for Kafka bootstrap server configuration

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../statefun-e2e-tests-common/pom.xml              |  12 ++
 .../e2e/common/StatefulFunctionsAppContainers.java | 126 ++++++++++++++-------
 .../src/main/resources}/flink-conf.yaml            |   0
 .../flink/statefun/e2e/sanity/Constants.java       |   2 +-
 .../e2e/sanity/SanityVerificationModule.java       |   9 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |  17 +--
 .../src/test/resources/flink-conf.yaml             |  25 ----
 statefun-flink/pom.xml                             |   3 +-
 .../flink/core/StatefulFunctionsConfig.java        |   2 +-
 9 files changed, 113 insertions(+), 83 deletions(-)
 copy {tools/docker/flink-distribution-template/conf => statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources}/flink-conf.yaml (100%)
 delete mode 100644 statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/flink-conf.yaml


[flink-statefun] 04/05: [FLINK-16274] [e2e] Use new dynamic configuration methods in SanityVerificationITCase

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit dbb7165185b93a58079c244a631112aebea29634
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:30:12 2020 +0800

    [FLINK-16274] [e2e] Use new dynamic configuration methods in SanityVerificationITCase
---
 .../statefun/e2e/sanity/SanityVerificationE2E.java | 12 +++--------
 .../src/test/resources/flink-conf.yaml             | 25 ----------------------
 2 files changed, 3 insertions(+), 34 deletions(-)

diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 87890ae..33ac127 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -23,7 +23,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.util.Collections;
 import java.util.Properties;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
 import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
 import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
@@ -57,12 +56,6 @@ public class SanityVerificationE2E {
 
   private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
 
-  private static final Configuration flinkConf = new Configuration();
-
-  static {
-    flinkConf.setString("statefun.module.global-config.kafka-broker", Constants.KAFKA_BROKER_HOST);
-  }
-
   @Rule
   public KafkaContainer kafka =
       new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
@@ -70,9 +63,10 @@ public class SanityVerificationE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("sanity-verification", 2, flinkConf)
+      new StatefulFunctionsAppContainers("sanity-verification", 2)
           .dependsOn(kafka)
-          .exposeMasterLogs(LOG);
+          .exposeMasterLogs(LOG)
+          .withModuleGlobalConfiguration("kafka-broker", Constants.KAFKA_BROKER_HOST);
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/flink-conf.yaml
deleted file mode 100644
index a3887ac..0000000
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/flink-conf.yaml
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# This file is the base for the Apache Flink configuration
-
-# this allows us to test local as well as remote function messaging
-parallelism.default: 2
-
-classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
-state.backend: rocksdb
-state.backend.rocksdb.timer-service.factory: ROCKSDB
-state.checkpoints.dir: file:///checkpoint-dir
-state.backend.incremental: true
-taskmanager.memory.process.size: 4g


[flink-statefun] 01/05: [FLINK-16274] [e2e] Make container construction lazy in StatefulFunctionsAppContainers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 14dc9857da17dc095f74a68319559e3d133e86a4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 15:36:27 2020 +0800

    [FLINK-16274] [e2e] Make container construction lazy in StatefulFunctionsAppContainers
---
 .../e2e/common/StatefulFunctionsAppContainers.java | 52 ++++++++++++++++------
 1 file changed, 38 insertions(+), 14 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 651e382..21b4e69 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -113,9 +113,16 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private static final String MASTER_HOST = "statefun-app-master";
   private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
 
+  private final String appName;
+  private final int numWorkers;
   private final Network network;
-  private final GenericContainer<?> master;
-  private final List<GenericContainer<?>> workers;
+
+  private final Configuration dynamicProperties;
+  private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
+  private Logger masterLogger;
+
+  private GenericContainer<?> master;
+  private List<GenericContainer<?>> workers;
 
   public StatefulFunctionsAppContainers(String appName, int numWorkers) {
     this(appName, numWorkers, null);
@@ -132,25 +139,28 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     }
 
     this.network = Network.newNetwork();
-
-    final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
-    this.master = masterContainer(appImage, network);
-    this.workers = workerContainers(appImage, numWorkers, network);
+    this.appName = appName;
+    this.numWorkers = numWorkers;
+    this.dynamicProperties = dynamicProperties;
   }
 
   public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
     container.withNetwork(network);
-    master.dependsOn(container);
+    this.dependentContainers.add(container);
     return this;
   }
 
   public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
-    master.withLogConsumer(new Slf4jLogConsumer(logger, true));
+    this.masterLogger = logger;
     return this;
   }
 
   @Override
   protected void before() throws Throwable {
+    final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
+    this.master = masterContainer(appImage, network, dependentContainers, masterLogger);
+    this.workers = workerContainers(appImage, numWorkers, network);
+
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -206,12 +216,26 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   }
 
   private static GenericContainer<?> masterContainer(
-      ImageFromDockerfile appImage, Network network) {
-    return new GenericContainer(appImage)
-        .withNetwork(network)
-        .withNetworkAliases(MASTER_HOST)
-        .withEnv("ROLE", "master")
-        .withEnv("MASTER_HOST", MASTER_HOST);
+      ImageFromDockerfile appImage,
+      Network network,
+      List<GenericContainer<?>> dependents,
+      @Nullable Logger masterLogger) {
+    final GenericContainer<?> master =
+        new GenericContainer(appImage)
+            .withNetwork(network)
+            .withNetworkAliases(MASTER_HOST)
+            .withEnv("ROLE", "master")
+            .withEnv("MASTER_HOST", MASTER_HOST);
+
+    for (GenericContainer<?> dependent : dependents) {
+      master.dependsOn(dependent);
+    }
+
+    if (masterLogger != null) {
+      master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+    }
+
+    return master;
   }
 
   private static List<GenericContainer<?>> workerContainers(


[flink-statefun] 02/05: [hotfix] POM for statefun-flink should inherit protobuf.version property

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f61faac69aa00733e472bed3aaaf58d6396c4392
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:33:19 2020 +0800

    [hotfix] POM for statefun-flink should inherit protobuf.version property
---
 statefun-flink/pom.xml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/statefun-flink/pom.xml b/statefun-flink/pom.xml
index 66c25e1..6e5493b 100644
--- a/statefun-flink/pom.xml
+++ b/statefun-flink/pom.xml
@@ -46,7 +46,6 @@ under the License.
         <jsr305.version>3.0.2</jsr305.version>
         <flink.version>1.10.0</flink.version>
         <jmh.version>1.21</jmh.version>
-        <protobuf-java.version>3.8.0</protobuf-java.version>
         <jsr305-version>1.3.9</jsr305-version>
     </properties>
 
@@ -135,7 +134,7 @@ under the License.
             <dependency>
                 <groupId>com.google.protobuf</groupId>
                 <artifactId>protobuf-java</artifactId>
-                <version>${protobuf-java.version}</version>
+                <version>${protobuf.version}</version>
             </dependency>
 
             <!-- JMH -->


[flink-statefun] 03/05: [FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 5c82fe729e5b23687b904fa98af2acb1d82d0a00
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:25:33 2020 +0800

    [FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings
    
    This commit adds the following methods:
    * withModuleGlobalConfiguration(String, String)
    * <T> withConfiguration(ConfigOption<T>, T)
    
    These methods build up a Configuration object holding dynamic
    properties. These properties will be merged with a base flink-conf.yaml
    that defines the template settings.
    
    This commit also eliminates the need for users of
    StatefulFunctionsAppContainers to require providing a flink-conf.yaml,
    and can therefore be set completely programmatically.
---
 .../statefun-e2e-tests-common/pom.xml              | 12 ++++
 .../e2e/common/StatefulFunctionsAppContainers.java | 80 ++++++++++++++--------
 .../src/main/resources/flink-conf.yaml             | 22 ++++++
 .../flink/core/StatefulFunctionsConfig.java        |  2 +-
 4 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index cf50a29..7f4801b 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -34,6 +34,14 @@ under the License.
     </properties>
 
     <dependencies>
+        <!-- Stateful Functions core -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Protobuf -->
         <dependency>
             <groupId>com.google.protobuf</groupId>
@@ -77,6 +85,10 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 21b4e69..f7e284d 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -19,16 +19,20 @@
 package org.apache.flink.statefun.e2e.common;
 
 import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +121,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private final int numWorkers;
   private final Network network;
 
-  private final Configuration dynamicProperties;
+  private final Configuration dynamicProperties = new Configuration();
   private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
   private Logger masterLogger;
 
@@ -125,11 +129,6 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private List<GenericContainer<?>> workers;
 
   public StatefulFunctionsAppContainers(String appName, int numWorkers) {
-    this(appName, numWorkers, null);
-  }
-
-  public StatefulFunctionsAppContainers(
-      String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
     if (appName == null || appName.isEmpty()) {
       throw new IllegalArgumentException(
           "App name must be non-empty. This is used as the application image name.");
@@ -141,7 +140,10 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     this.network = Network.newNetwork();
     this.appName = appName;
     this.numWorkers = numWorkers;
-    this.dynamicProperties = dynamicProperties;
+
+    if (numWorkers > 1) {
+      dynamicProperties.set(CoreOptions.DEFAULT_PARALLELISM, numWorkers);
+    }
   }
 
   public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
@@ -155,6 +157,16 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return this;
   }
 
+  public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) {
+    this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
+    return this;
+  }
+
+  public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) {
+    this.dynamicProperties.set(config, value);
+    return this;
+  }
+
   @Override
   protected void before() throws Throwable {
     final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
@@ -171,8 +183,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(
-      String appName, @Nullable Configuration dynamicProperties) {
+  private static ImageFromDockerfile appImage(String appName, Configuration dynamicProperties) {
     final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
     LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
 
@@ -181,29 +192,30 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
             .withFileFromClasspath("Dockerfile", "Dockerfile")
             .withFileFromPath(".", targetDirPath);
 
-    Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
-    if (flinkConf != null) {
-      appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
-    }
+    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+    String flinkConfString = flinkConfigAsString(flinkConf);
+    LOG.info(
+        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
+        flinkConf);
+    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
 
     return appImage;
   }
 
-  private static @Nullable Configuration loadFlinkConfIfAvailable(
-      @Nullable Configuration dynamicProperties) {
-    final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
-    if (flinkConfUrl == null) {
-      return dynamicProperties;
-    }
-
-    final String flinkConfDir;
-    try {
-      flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Failed to load flink-conf.yaml", e);
+  /**
+   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
+   * resources.
+   */
+  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
+    final InputStream baseFlinkConfResourceInputStream =
+        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+    if (baseFlinkConfResourceInputStream == null) {
+      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
     }
 
-    return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+    return GlobalConfiguration.loadConfiguration(
+        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
   }
 
   private static String flinkConfigAsString(Configuration configuration) {
@@ -215,6 +227,18 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return yaml.toString();
   }
 
+  private static File copyToTempFlinkConfFile(InputStream inputStream) {
+    try {
+      final File tempFile =
+          new File(
+              Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
+      Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+      return tempFile;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static GenericContainer<?> masterContainer(
       ImageFromDockerfile appImage,
       Network network,
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
new file mode 100644
index 0000000..4af978f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+taskmanager.memory.process.size: 4g
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 969a194..ce40be0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -45,7 +45,7 @@ public class StatefulFunctionsConfig implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
+  public static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
 
   // This configuration option exists for the documentation generator
   @SuppressWarnings("unused")


[flink-statefun] 05/05: [FLINK-16274] Consolidate constants for Kafka bootstrap server configuration

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e2305e866d363c4fbced154926af6b8df7d7c979
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:49:59 2020 +0800

    [FLINK-16274] Consolidate constants for Kafka bootstrap server configuration
    
    This closes #37.
---
 .../java/org/apache/flink/statefun/e2e/sanity/Constants.java     | 2 +-
 .../flink/statefun/e2e/sanity/SanityVerificationModule.java      | 9 +++++----
 .../apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java  | 7 ++++---
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
index b1d16dd..a17f229 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
@@ -28,7 +28,7 @@ final class Constants {
 
   private Constants() {}
 
-  static final String KAFKA_BROKER_HOST = "kafka-broker";
+  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
 
   static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
       new IngressIdentifier<>(Command.class, "org.apache.flink.itcases.sanity", "commands");
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
index 0802265..519d1c4 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
@@ -38,12 +38,13 @@ public class SanityVerificationModule implements StatefulFunctionModule {
 
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaAddress = globalConfiguration.get("kafka-broker");
-    if (kafkaAddress == null) {
-      throw new IllegalStateException("Missing required global configuration kafka-broker");
+    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    if (kafkaBootstrapServers == null) {
+      throw new IllegalStateException(
+          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
     }
 
-    configureKafkaIO(kafkaAddress + ":9092", binder);
+    configureKafkaIO(kafkaBootstrapServers, binder);
     configureCommandRouter(binder);
     configureCommandResolverFunctions(binder);
   }
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 33ac127..1bd08c7 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -55,18 +55,19 @@ public class SanityVerificationE2E {
   private static final Logger LOG = LoggerFactory.getLogger(SanityVerificationE2E.class);
 
   private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+  private static final String KAFKA_HOST = "kafka-broker";
 
   @Rule
   public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
-          .withNetworkAliases(Constants.KAFKA_BROKER_HOST);
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
       new StatefulFunctionsAppContainers("sanity-verification", 2)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
-          .withModuleGlobalConfiguration("kafka-broker", Constants.KAFKA_BROKER_HOST);
+          .withModuleGlobalConfiguration(
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {