You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/29 03:51:14 UTC
[flink-statefun] 05/05: [FLINK-16274] Consolidate constants for
Kafka bootstrap server configuration
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit e2305e866d363c4fbced154926af6b8df7d7c979
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:49:59 2020 +0800
[FLINK-16274] Consolidate constants for Kafka bootstrap server configuration
This closes #37.
---
.../java/org/apache/flink/statefun/e2e/sanity/Constants.java | 2 +-
.../flink/statefun/e2e/sanity/SanityVerificationModule.java | 9 +++++----
.../apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java | 7 ++++---
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
index b1d16dd..a17f229 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/Constants.java
@@ -28,7 +28,7 @@ final class Constants {
private Constants() {}
- static final String KAFKA_BROKER_HOST = "kafka-broker";
+ static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
static final IngressIdentifier<Command> COMMAND_INGRESS_ID =
new IngressIdentifier<>(Command.class, "org.apache.flink.itcases.sanity", "commands");
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
index 0802265..519d1c4 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/main/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationModule.java
@@ -38,12 +38,13 @@ public class SanityVerificationModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
- String kafkaAddress = globalConfiguration.get("kafka-broker");
- if (kafkaAddress == null) {
- throw new IllegalStateException("Missing required global configuration kafka-broker");
+ String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+ if (kafkaBootstrapServers == null) {
+ throw new IllegalStateException(
+ "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
}
- configureKafkaIO(kafkaAddress + ":9092", binder);
+ configureKafkaIO(kafkaBootstrapServers, binder);
configureCommandRouter(binder);
configureCommandResolverFunctions(binder);
}
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 33ac127..1bd08c7 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -55,18 +55,19 @@ public class SanityVerificationE2E {
private static final Logger LOG = LoggerFactory.getLogger(SanityVerificationE2E.class);
private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+ private static final String KAFKA_HOST = "kafka-broker";
@Rule
public KafkaContainer kafka =
- new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
- .withNetworkAliases(Constants.KAFKA_BROKER_HOST);
+ new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
@Rule
public StatefulFunctionsAppContainers verificationApp =
new StatefulFunctionsAppContainers("sanity-verification", 2)
.dependsOn(kafka)
.exposeMasterLogs(LOG)
- .withModuleGlobalConfiguration("kafka-broker", Constants.KAFKA_BROKER_HOST);
+ .withModuleGlobalConfiguration(
+ Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
@Test(timeout = 60_000L)
public void run() throws Exception {