You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/29 03:51:14 UTC

[flink-statefun] 05/05: [FLINK-16274] Consolidate constants for Kafka bootstrap server configuration

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e2305e866d363c4fbced154926af6b8df7d7c979
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:49:59 2020 +0800

    [FLINK-16274] Consolidate constants for Kafka bootstrap server configuration
    
    This closes #37.
---
 .../java/org/apache/flink/statefun/e2e/sanity/Constants.java     | 2 +-
 .../flink/statefun/e2e/sanity/SanityVerificationModule.java      | 9 +++++----
 .../apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java  | 7 ++++---
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
index b1d16dd..a17f229 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
@@ -28,7 +28,7 @@ final class Constants {
 
   private Constants() {}
 
-  static final String KAFKA_BROKER_HOST = "kafka-broker";
+  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
 
   static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
       new IngressIdentifier<>(Command.class, "org.apache.flink.itcases.sanity", "commands");
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
index 0802265..519d1c4 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
@@ -38,12 +38,13 @@ public class SanityVerificationModule implements StatefulFunctionModule {
 
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaAddress = globalConfiguration.get("kafka-broker");
-    if (kafkaAddress == null) {
-      throw new IllegalStateException("Missing required global configuration kafka-broker");
+    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    if (kafkaBootstrapServers == null) {
+      throw new IllegalStateException(
+          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
     }
 
-    configureKafkaIO(kafkaAddress + ":9092", binder);
+    configureKafkaIO(kafkaBootstrapServers, binder);
     configureCommandRouter(binder);
     configureCommandResolverFunctions(binder);
   }
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 33ac127..1bd08c7 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -55,18 +55,19 @@ public class SanityVerificationE2E {
   private static final Logger LOG = LoggerFactory.getLogger(SanityVerificationE2E.class);
 
   private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+  private static final String KAFKA_HOST = "kafka-broker";
 
   @Rule
   public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
-          .withNetworkAliases(Constants.KAFKA_BROKER_HOST);
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
       new StatefulFunctionsAppContainers("sanity-verification", 2)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
-          .withModuleGlobalConfiguration("kafka-broker", Constants.KAFKA_BROKER_HOST);
+          .withModuleGlobalConfiguration(
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {