You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/25 05:55:20 UTC

[flink-statefun] branch master updated: [FLINK-16149][hotfix][core] Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new e9b8bcb  [FLINK-16149][hotfix][core] Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment
e9b8bcb is described below

commit e9b8bcbad0d9798c2543af97a848737c10ae5baa
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Feb 24 12:38:12 2020 -0600

    [FLINK-16149][hotfix][core] Use GlobalConfiguration.loadConfiguration() on StreamPlanEnvironment
    
    Because the statefun-flink-launcher is based on JobClusterEntrypoint it
    pulls the StreamGraph using a StreamPlanEnvironment. In this case, the
    StreamExecutionEnviornment configuration is set to an empty object.
    
    The only way to pull the actual flink-conf in this case is via
    GlobalConfiguration.loadConfiguration(). This is safe because
    StreamPlanEnvironment implies job based deployment. In all other cases
    the configuration is still pulled via reflection.
    
    This closes #32.
---
 .../statefun-sanity-itcase/pom.xml                 | 13 ++++++
 .../itcases/sanity/SanityVerificationModule.java   | 13 +++---
 .../itcases/sanity/SanityVerificationITCase.java   |  9 ++++-
 .../testutils/StatefulFunctionsAppContainers.java  | 47 +++++++++++++++++++---
 .../flink/core/StatefulFunctionsConfig.java        |  6 ++-
 5 files changed, 75 insertions(+), 13 deletions(-)

diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
index 02eaadd..ab5d842 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
@@ -29,6 +29,7 @@ under the License.
 
     <properties>
         <testcontainers.version>1.12.5</testcontainers.version>
+        <flink.version>1.10.0</flink.version>
     </properties>
 
     <dependencies>
@@ -76,6 +77,10 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -84,6 +89,14 @@ under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <!-- Flink Config -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
index 6a586dd..14b9ea8 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/main/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationModule.java
@@ -36,17 +36,20 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 @AutoService(StatefulFunctionModule.class)
 public class SanityVerificationModule implements StatefulFunctionModule {
 
-  private static final String KAFKA_ADDRESS = Constants.KAFKA_BROKER_HOST + ":9092";
-
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    configureKafkaIO(binder);
+    String kafkaAddress = globalConfiguration.get("kafka-broker");
+    if (kafkaAddress == null) {
+      throw new IllegalStateException("Missing required global configuration kafka-broker");
+    }
+
+    configureKafkaIO(kafkaAddress + ":9092", binder);
     configureCommandRouter(binder);
     configureCommandResolverFunctions(binder);
   }
 
-  private static void configureKafkaIO(Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(KAFKA_ADDRESS);
+  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
     binder.bindIngress(kafkaIO.getIngressSpec());
     binder.bindEgress(kafkaIO.getEgressSpec());
   }
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
index 621ea72..d383fe8 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
@@ -61,6 +62,12 @@ public class SanityVerificationITCase {
 
   private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
 
+  private static final Configuration flinkConf = new Configuration();
+
+  static {
+    flinkConf.setString("statefun.module.global-config.kafka-broker", Constants.KAFKA_BROKER_HOST);
+  }
+
   @Rule
   public KafkaContainer kafka =
       new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
@@ -68,7 +75,7 @@ public class SanityVerificationITCase {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("sanity-verification", 2)
+      new StatefulFunctionsAppContainers("sanity-verification", 2, flinkConf)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG);
 
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
index 5757273..09a3113 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.statefun.itcases.sanity.testutils;
 
+import java.io.File;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,6 +118,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private final List<GenericContainer> workers;
 
   public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+    this(appName, numWorkers, null);
+  }
+
+  public StatefulFunctionsAppContainers(
+      String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
     if (appName == null || appName.isEmpty()) {
       throw new IllegalArgumentException(
           "App name must be non-empty. This is used as the application image name.");
@@ -122,7 +133,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
 
     this.network = Network.newNetwork();
 
-    final ImageFromDockerfile appImage = appImage(appName);
+    final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
     this.master = masterContainer(appImage, network);
     this.workers = workerContainers(appImage, numWorkers, network);
   }
@@ -150,7 +161,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(String appName) {
+  private static ImageFromDockerfile appImage(
+      String appName, @Nullable Configuration dynamicProperties) {
     final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
     LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
 
@@ -158,16 +170,39 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
         new ImageFromDockerfile(appName)
             .withFileFromClasspath("Dockerfile", "Dockerfile")
             .withFileFromPath(".", targetDirPath);
-    if (flinkConfYamlExists()) {
-      appImage.withFileFromClasspath("flink-conf.yaml", "flink-conf.yaml");
+
+    Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
+    if (flinkConf != null) {
+      appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
     }
 
     return appImage;
   }
 
-  private static boolean flinkConfYamlExists() {
+  private static @Nullable Configuration loadFlinkConfIfAvailable(
+      @Nullable Configuration dynamicProperties) {
     final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
-    return flinkConfUrl != null;
+    if (flinkConfUrl == null) {
+      return dynamicProperties;
+    }
+
+    final String flinkConfDir;
+    try {
+      flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Failed to load flink-conf.yaml", e);
+    }
+
+    return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+  }
+
+  private static String flinkConfigAsString(Configuration configuration) {
+    StringBuilder yaml = new StringBuilder();
+    for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+      yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+    }
+
+    return yaml.toString();
   }
 
   private static GenericContainer masterContainer(ImageFromDockerfile appImage, Network network) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 688c705..969a194 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -30,11 +30,13 @@ import java.util.Objects;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamPlanEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 
 /** Configuration that captures all stateful function related settings. */
@@ -103,8 +105,10 @@ public class StatefulFunctionsConfig implements Serializable {
     return new StatefulFunctionsConfig(configuration);
   }
 
-  @SuppressWarnings("JavaReflectionMemberAccess")
   private static Configuration getConfiguration(StreamExecutionEnvironment env) {
+    if (env instanceof StreamPlanEnvironment) {
+      return GlobalConfiguration.loadConfiguration();
+    }
     try {
       Method getConfiguration =
           StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");