You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/25 05:55:20 UTC
[flink-statefun] branch master updated: [FLINK-16149][hotfix][core]
Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push:
new e9b8bcb [FLINK-16149][hotfix][core] Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment
e9b8bcb is described below
commit e9b8bcbad0d9798c2543af97a848737c10ae5baa
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Feb 24 12:38:12 2020 -0600
[FLINK-16149][hotfix][core] Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment
Because the statefun-flink-launcher is based on JobClusterEntrypoint it
pulls the StreamGraph using a StreamPlanEnvironment. In this case, the
StreamExecutionEnviornment configuration is set to an empty object.
The only way to pull the actual flink-conf in this case is via
GlobalConfiguration.loadConfiguration(). This is safe because
StreamPlanEnvironment implies job based deployment. In all other cases
the configuration is still pulled via reflection.
This closes #32.
---
.../statefun-sanity-itcase/pom.xml | 13 ++++++
.../itcases/sanity/SanityVerificationModule.java | 13 +++---
.../itcases/sanity/SanityVerificationITCase.java | 9 ++++-
.../testutils/StatefulFunctionsAppContainers.java | 47 +++++++++++++++++++---
.../flink/core/StatefulFunctionsConfig.java | 6 ++-
5 files changed, 75 insertions(+), 13 deletions(-)
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
index 02eaadd..ab5d842 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
@@ -29,6 +29,7 @@ under the License.
<properties>
<testcontainers.version>1.12.5</testcontainers.version>
+ <flink.version>1.10.0</flink.version>
</properties>
<dependencies>
@@ -76,6 +77,10 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -84,6 +89,14 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- Flink Config -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
index 6a586dd..14b9ea8 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
@@ -36,17 +36,20 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
@AutoService(StatefulFunctionModule.class)
public class SanityVerificationModule implements StatefulFunctionModule {
- private static final String KAFKA_ADDRESS = Constants.KAFKA_BROKER_HOST + ":9092";
-
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
- configureKafkaIO(binder);
+ String kafkaAddress = globalConfiguration.get("kafka-broker");
+ if (kafkaAddress == null) {
+ throw new IllegalStateException("Missing required global configuration kafka-broker");
+ }
+
+ configureKafkaIO(kafkaAddress + ":9092", binder);
configureCommandRouter(binder);
configureCommandResolverFunctions(binder);
}
- private static void configureKafkaIO(Binder binder) {
- final KafkaIO kafkaIO = new KafkaIO(KAFKA_ADDRESS);
+ private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+ final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
binder.bindIngress(kafkaIO.getIngressSpec());
binder.bindEgress(kafkaIO.getEgressSpec());
}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
index 621ea72..d383fe8 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
@@ -61,6 +62,12 @@ public class SanityVerificationITCase {
private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
+ private static final Configuration flinkConf = new Configuration();
+
+ static {
+ flinkConf.setString("statefun.module.global-config.kafka-broker", Constants.KAFKA_BROKER_HOST);
+ }
+
@Rule
public KafkaContainer kafka =
new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
@@ -68,7 +75,7 @@ public class SanityVerificationITCase {
@Rule
public StatefulFunctionsAppContainers verificationApp =
- new StatefulFunctionsAppContainers("sanity-verification", 2)
+ new StatefulFunctionsAppContainers("sanity-verification", 2, flinkConf)
.dependsOn(kafka)
.exposeMasterLogs(LOG);
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
index 5757273..09a3113 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
@@ -18,11 +18,17 @@
package org.apache.flink.statefun.itcases.sanity.testutils;
+import java.io.File;
+import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +118,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
private final List<GenericContainer> workers;
public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+ this(appName, numWorkers, null);
+ }
+
+ public StatefulFunctionsAppContainers(
+ String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
if (appName == null || appName.isEmpty()) {
throw new IllegalArgumentException(
"App name must be non-empty. This is used as the application image name.");
@@ -122,7 +133,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
this.network = Network.newNetwork();
- final ImageFromDockerfile appImage = appImage(appName);
+ final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
this.master = masterContainer(appImage, network);
this.workers = workerContainers(appImage, numWorkers, network);
}
@@ -150,7 +161,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
workers.forEach(GenericContainer::stop);
}
- private static ImageFromDockerfile appImage(String appName) {
+ private static ImageFromDockerfile appImage(
+ String appName, @Nullable Configuration dynamicProperties) {
final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
@@ -158,16 +170,39 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
new ImageFromDockerfile(appName)
.withFileFromClasspath("Dockerfile", "Dockerfile")
.withFileFromPath(".", targetDirPath);
- if (flinkConfYamlExists()) {
- appImage.withFileFromClasspath("flink-conf.yaml", "flink-conf.yaml");
+
+ Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
+ if (flinkConf != null) {
+ appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
}
return appImage;
}
- private static boolean flinkConfYamlExists() {
+ private static @Nullable Configuration loadFlinkConfIfAvailable(
+ @Nullable Configuration dynamicProperties) {
final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
- return flinkConfUrl != null;
+ if (flinkConfUrl == null) {
+ return dynamicProperties;
+ }
+
+ final String flinkConfDir;
+ try {
+ flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Failed to load flink-conf.yaml", e);
+ }
+
+ return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+ }
+
+ private static String flinkConfigAsString(Configuration configuration) {
+ StringBuilder yaml = new StringBuilder();
+ for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+ yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+ }
+
+ return yaml.toString();
}
private static GenericContainer masterContainer(ImageFromDockerfile appImage, Network network) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 688c705..969a194 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -30,11 +30,13 @@ import java.util.Objects;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamPlanEnvironment;
import org.apache.flink.util.InstantiationUtil;
/** Configuration that captures all stateful function related settings. */
@@ -103,8 +105,10 @@ public class StatefulFunctionsConfig implements Serializable {
return new StatefulFunctionsConfig(configuration);
}
- @SuppressWarnings("JavaReflectionMemberAccess")
private static Configuration getConfiguration(StreamExecutionEnvironment env) {
+ if (env instanceof StreamPlanEnvironment) {
+ return GlobalConfiguration.loadConfiguration();
+ }
try {
Method getConfiguration =
StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");