You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/29 03:51:12 UTC

[flink-statefun] 03/05: [FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 5c82fe729e5b23687b904fa98af2acb1d82d0a00
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:25:33 2020 +0800

    [FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings
    
    This commit adds the following methods:
    * withModuleGlobalConfiguration(String, String)
    * <T> withConfiguration(ConfigOption<T>, T)
    
    These methods build up a Configuration object holding dynamic
    properties. These properties will be merged with a base flink-conf.yaml
    that defines the template settings.
    
    This commit also eliminates the need for users of
    StatefulFunctionsAppContainers to require providing a flink-conf.yaml,
    and can therefore be set completely programmatically.
---
 .../statefun-e2e-tests-common/pom.xml              | 12 ++++
 .../e2e/common/StatefulFunctionsAppContainers.java | 80 ++++++++++++++--------
 .../src/main/resources/flink-conf.yaml             | 22 ++++++
 .../flink/core/StatefulFunctionsConfig.java        |  2 +-
 4 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index cf50a29..7f4801b 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -34,6 +34,14 @@ under the License.
     </properties>
 
     <dependencies>
+        <!-- Stateful Functions core -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Protobuf -->
         <dependency>
             <groupId>com.google.protobuf</groupId>
@@ -77,6 +85,10 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
     </dependencies>
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 21b4e69..f7e284d 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -19,16 +19,20 @@
 package org.apache.flink.statefun.e2e.common;
 
 import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
+import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +121,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private final int numWorkers;
   private final Network network;
 
-  private final Configuration dynamicProperties;
+  private final Configuration dynamicProperties = new Configuration();
   private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
   private Logger masterLogger;
 
@@ -125,11 +129,6 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
   private List<GenericContainer<?>> workers;
 
   public StatefulFunctionsAppContainers(String appName, int numWorkers) {
-    this(appName, numWorkers, null);
-  }
-
-  public StatefulFunctionsAppContainers(
-      String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
     if (appName == null || appName.isEmpty()) {
       throw new IllegalArgumentException(
           "App name must be non-empty. This is used as the application image name.");
@@ -141,7 +140,10 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     this.network = Network.newNetwork();
     this.appName = appName;
     this.numWorkers = numWorkers;
-    this.dynamicProperties = dynamicProperties;
+
+    if (numWorkers > 1) {
+      dynamicProperties.set(CoreOptions.DEFAULT_PARALLELISM, numWorkers);
+    }
   }
 
   public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
@@ -155,6 +157,16 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return this;
   }
 
+  public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) {
+    this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
+    return this;
+  }
+
+  public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) {
+    this.dynamicProperties.set(config, value);
+    return this;
+  }
+
   @Override
   protected void before() throws Throwable {
     final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
@@ -171,8 +183,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(
-      String appName, @Nullable Configuration dynamicProperties) {
+  private static ImageFromDockerfile appImage(String appName, Configuration dynamicProperties) {
     final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
     LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
 
@@ -181,29 +192,30 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
             .withFileFromClasspath("Dockerfile", "Dockerfile")
             .withFileFromPath(".", targetDirPath);
 
-    Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
-    if (flinkConf != null) {
-      appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
-    }
+    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+    String flinkConfString = flinkConfigAsString(flinkConf);
+    LOG.info(
+        "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
+        flinkConf);
+    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
 
     return appImage;
   }
 
-  private static @Nullable Configuration loadFlinkConfIfAvailable(
-      @Nullable Configuration dynamicProperties) {
-    final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
-    if (flinkConfUrl == null) {
-      return dynamicProperties;
-    }
-
-    final String flinkConfDir;
-    try {
-      flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Failed to load flink-conf.yaml", e);
+  /**
+   * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
+   * resources.
+   */
+  private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
+    final InputStream baseFlinkConfResourceInputStream =
+        StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+    if (baseFlinkConfResourceInputStream == null) {
+      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
     }
 
-    return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+    final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+    return GlobalConfiguration.loadConfiguration(
+        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
   }
 
   private static String flinkConfigAsString(Configuration configuration) {
@@ -215,6 +227,18 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
     return yaml.toString();
   }
 
+  private static File copyToTempFlinkConfFile(InputStream inputStream) {
+    try {
+      final File tempFile =
+          new File(
+              Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
+      Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+      return tempFile;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static GenericContainer<?> masterContainer(
       ImageFromDockerfile appImage,
       Network network,
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
new file mode 100644
index 0000000..4af978f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+taskmanager.memory.process.size: 4g
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 969a194..ce40be0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -45,7 +45,7 @@ public class StatefulFunctionsConfig implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
+  public static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
 
   // This configuration option exists for the documentation generator
   @SuppressWarnings("unused")