You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/24 10:03:38 UTC
[flink-statefun] 03/06: [FLINK-16159] [tests] Introduce
StatefulFunctionsAppContainers test rule
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f69d15d78e97ecc0e2326599e8086c9189031c3b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 16:35:05 2020 +0800
[FLINK-16159] [tests] Introduce StatefulFunctionsAppContainers test rule
StatefulFunctionsAppContainers is a JUnit test rule that setups a
containerized Stateful Functions application using Testcontainers. This
allows composing end-to-end tests for Stateful Functions applications
easier, by managing the containeraized applicaition as an external test
resource whose lifecycle is integrated with the JUnit test framework.
---
.../testutils/StatefulFunctionsAppContainers.java | 200 +++++++++++++++++++++
1 file changed, 200 insertions(+)
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
new file mode 100644
index 0000000..5757273
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity.testutils;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
+ * application using <a href="https://www.testcontainers.org/">Testcontainers</a>. This allows
+ * composing end-to-end tests for Stateful Functions applications easier, by managing the
+ * containerized application as an external test resource whose lifecycle is integrated with the
+ * JUnit test framework.
+ *
+ * <h2>Example usage</h2>
+ *
+ * <pre>{@code
+ * public class MyITCase {
+ *
+ * {@code @Rule}
+ * public StatefulFunctionsAppContainers myApp =
+ * new StatefulFunctionsAppContainers("app-name", 3);
+ *
+ * {@code @Test}
+ * public void runTest() {
+ * // the containers for the app, including master and workers, will already be running
+ * // before the test is run; implement your test logic against the app
+ * }
+ * }
+ * }</pre>
+ *
+ * <p>In most cases you'd also need to start an additional system for the test, for example starting
+ * a container that runs Kafka from which the application depends on as an ingress or egress. The
+ * following demonstrates adding a Kafka container to the setup:
+ *
+ * <pre>{@code
+ * public class MyKafkaITCase {
+ *
+ * {@code @Rule}
+ * public KafkaContainer kafka = new KafkaContainer();
+ *
+ * {@code @Rule}
+ * public StatefulFunctionsAppContainers myApp =
+ * new StatefulFunctionsAppContainers("app-name", 3)
+ * .dependsOn(kafka);
+ *
+ * ...
+ * }
+ * }</pre>
+ *
+ * <p>Application master and worker containers will always be started after containers that are
+ * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * depended on will also be setup such that they share the same network with the master and workers,
+ * so that they can freely communicate with each other.
+ *
+ * <h2>Prerequisites</h2>
+ *
+ * <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
+ * rule to work.
+ *
+ * <p>When building the Docker image for the Stateful Functions application under test, the
+ * following files are added to the build context:
+ *
+ * <uL>
+ * <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
+ * to be present. A simple way is to add the Dockerfile to the test resources directory. This
+ * will be added to the root of the Docker image build context.
+ * <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
+ * any. You can also add this to the test resources directory. This will be added to the root
+ * of the Docker image build context.
+ * <li>All built artifacts under the generated {@code target} folder for the project module that
+ * the test resides in. This is required to be present, so this entails that the tests can
+ * only be ran after artifacts are built. The built artifacts are added to the root of the
+ * Docker image build context.
+ * </uL>
+ */
+public final class StatefulFunctionsAppContainers extends ExternalResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
+
+ private static final String MASTER_HOST = "statefun-app-master";
+ private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+ private final Network network;
+ private final GenericContainer master;
+ private final List<GenericContainer> workers;
+
+ public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+ if (appName == null || appName.isEmpty()) {
+ throw new IllegalArgumentException(
+ "App name must be non-empty. This is used as the application image name.");
+ }
+ if (numWorkers < 1) {
+ throw new IllegalArgumentException("Must have at least 1 worker.");
+ }
+
+ this.network = Network.newNetwork();
+
+ final ImageFromDockerfile appImage = appImage(appName);
+ this.master = masterContainer(appImage, network);
+ this.workers = workerContainers(appImage, numWorkers, network);
+ }
+
+ public StatefulFunctionsAppContainers dependsOn(GenericContainer container) {
+ container.withNetwork(network);
+ master.dependsOn(container);
+ return this;
+ }
+
+ public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
+ master.withLogConsumer(new Slf4jLogConsumer(logger, true));
+ return this;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ master.start();
+ workers.forEach(GenericContainer::start);
+ }
+
+ @Override
+ protected void after() {
+ master.stop();
+ workers.forEach(GenericContainer::stop);
+ }
+
+ private static ImageFromDockerfile appImage(String appName) {
+ final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+ LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+ final ImageFromDockerfile appImage =
+ new ImageFromDockerfile(appName)
+ .withFileFromClasspath("Dockerfile", "Dockerfile")
+ .withFileFromPath(".", targetDirPath);
+ if (flinkConfYamlExists()) {
+ appImage.withFileFromClasspath("flink-conf.yaml", "flink-conf.yaml");
+ }
+
+ return appImage;
+ }
+
+ private static boolean flinkConfYamlExists() {
+ final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
+ return flinkConfUrl != null;
+ }
+
+ private static GenericContainer masterContainer(ImageFromDockerfile appImage, Network network) {
+ return new GenericContainer(appImage)
+ .withNetwork(network)
+ .withNetworkAliases(MASTER_HOST)
+ .withEnv("ROLE", "master")
+ .withEnv("MASTER_HOST", MASTER_HOST);
+ }
+
+ private static List<GenericContainer> workerContainers(
+ ImageFromDockerfile appImage, int numWorkers, Network network) {
+ final List<GenericContainer> workers = new ArrayList<>(numWorkers);
+
+ for (int i = 0; i < numWorkers; i++) {
+ workers.add(
+ new GenericContainer(appImage)
+ .withNetwork(network)
+ .withNetworkAliases(workerHostOf(i))
+ .withEnv("ROLE", "worker")
+ .withEnv("MASTER_HOST", MASTER_HOST));
+ }
+
+ return workers;
+ }
+
+ private static String workerHostOf(int workerIndex) {
+ return WORKER_HOST_PREFIX + "-" + workerIndex;
+ }
+}