You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/29 03:51:12 UTC
[flink-statefun] 03/05: [FLINK-16274] [e2e] Add builder methods for
flink-conf.yaml settings
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 5c82fe729e5b23687b904fa98af2acb1d82d0a00
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 16:25:33 2020 +0800
[FLINK-16274] [e2e] Add builder methods for flink-conf.yaml settings
This commit adds the following methods:
* withModuleGlobalConfiguration(String, String)
* <T> withConfiguration(ConfigOption<T>, T)
These methods build up a Configuration object holding dynamic
properties. These properties will be merged with a base flink-conf.yaml
that defines the template settings.
This commit also eliminates the need for users of
StatefulFunctionsAppContainers to require providing a flink-conf.yaml,
and can therefore be set completely programmatically.
---
.../statefun-e2e-tests-common/pom.xml | 12 ++++
.../e2e/common/StatefulFunctionsAppContainers.java | 80 ++++++++++++++--------
.../src/main/resources/flink-conf.yaml | 22 ++++++
.../flink/core/StatefulFunctionsConfig.java | 2 +-
4 files changed, 87 insertions(+), 29 deletions(-)
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index cf50a29..7f4801b 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -34,6 +34,14 @@ under the License.
</properties>
<dependencies>
+ <!-- Stateful Functions core -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -77,6 +85,10 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index 21b4e69..f7e284d 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -19,16 +19,20 @@
package org.apache.flink.statefun.e2e.common;
import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
+import java.io.InputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +121,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
private final int numWorkers;
private final Network network;
- private final Configuration dynamicProperties;
+ private final Configuration dynamicProperties = new Configuration();
private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
private Logger masterLogger;
@@ -125,11 +129,6 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
private List<GenericContainer<?>> workers;
public StatefulFunctionsAppContainers(String appName, int numWorkers) {
- this(appName, numWorkers, null);
- }
-
- public StatefulFunctionsAppContainers(
- String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
if (appName == null || appName.isEmpty()) {
throw new IllegalArgumentException(
"App name must be non-empty. This is used as the application image name.");
@@ -141,7 +140,10 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
this.network = Network.newNetwork();
this.appName = appName;
this.numWorkers = numWorkers;
- this.dynamicProperties = dynamicProperties;
+
+ if (numWorkers > 1) {
+ dynamicProperties.set(CoreOptions.DEFAULT_PARALLELISM, numWorkers);
+ }
}
public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
@@ -155,6 +157,16 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
return this;
}
+ public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) {
+ this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
+ return this;
+ }
+
+ public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) {
+ this.dynamicProperties.set(config, value);
+ return this;
+ }
+
@Override
protected void before() throws Throwable {
final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
@@ -171,8 +183,7 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
workers.forEach(GenericContainer::stop);
}
- private static ImageFromDockerfile appImage(
- String appName, @Nullable Configuration dynamicProperties) {
+ private static ImageFromDockerfile appImage(String appName, Configuration dynamicProperties) {
final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
@@ -181,29 +192,30 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
.withFileFromClasspath("Dockerfile", "Dockerfile")
.withFileFromPath(".", targetDirPath);
- Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
- if (flinkConf != null) {
- appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
- }
+ Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+ String flinkConfString = flinkConfigAsString(flinkConf);
+ LOG.info(
+ "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
+ flinkConf);
+ appImage.withFileFromString("flink-conf.yaml", flinkConfString);
return appImage;
}
- private static @Nullable Configuration loadFlinkConfIfAvailable(
- @Nullable Configuration dynamicProperties) {
- final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
- if (flinkConfUrl == null) {
- return dynamicProperties;
- }
-
- final String flinkConfDir;
- try {
- flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
- } catch (URISyntaxException e) {
- throw new RuntimeException("Failed to load flink-conf.yaml", e);
+ /**
+ * Merges set dynamic properties with configuration in the base flink-conf.yaml located in
+ * resources.
+ */
+ private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
+ final InputStream baseFlinkConfResourceInputStream =
+ StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+ if (baseFlinkConfResourceInputStream == null) {
+ throw new RuntimeException("Base flink-conf.yaml cannot be found.");
}
- return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+ final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+ return GlobalConfiguration.loadConfiguration(
+ tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
}
private static String flinkConfigAsString(Configuration configuration) {
@@ -215,6 +227,18 @@ public final class StatefulFunctionsAppContainers extends ExternalResource {
return yaml.toString();
}
+ private static File copyToTempFlinkConfFile(InputStream inputStream) {
+ try {
+ final File tempFile =
+ new File(
+ Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
+ Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ return tempFile;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static GenericContainer<?> masterContainer(
ImageFromDockerfile appImage,
Network network,
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
new file mode 100644
index 0000000..4af978f
--- /dev/null
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+taskmanager.memory.process.size: 4g
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 969a194..ce40be0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -45,7 +45,7 @@ public class StatefulFunctionsConfig implements Serializable {
private static final long serialVersionUID = 1L;
- private static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
+ public static final String MODULE_CONFIG_PREFIX = "statefun.module.global-config.";
// This configuration option exists for the documentation generator
@SuppressWarnings("unused")