You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/24 10:03:38 UTC

[flink-statefun] 03/06: [FLINK-16159] [tests] Introduce StatefulFunctionsAppContainers test rule

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit f69d15d78e97ecc0e2326599e8086c9189031c3b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 16:35:05 2020 +0800

    [FLINK-16159] [tests] Introduce StatefulFunctionsAppContainers test rule
    
    StatefulFunctionsAppContainers is a JUnit test rule that setups a
    containerized Stateful Functions application using Testcontainers. This
    allows composing end-to-end tests for Stateful Functions applications
    easier, by managing the containeraized applicaition as an external test
    resource whose lifecycle is integrated with the JUnit test framework.
---
 .../testutils/StatefulFunctionsAppContainers.java  | 200 +++++++++++++++++++++
 1 file changed, 200 insertions(+)

diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
new file mode 100644
index 0000000..5757273
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/StatefulFunctionsAppContainers.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity.testutils;
+
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
+ * application using <a href="https://www.testcontainers.org/">Testcontainers</a>. This allows
+ * composing end-to-end tests for Stateful Functions applications easier, by managing the
+ * containerized application as an external test resource whose lifecycle is integrated with the
+ * JUnit test framework.
+ *
+ * <h2>Example usage</h2>
+ *
+ * <pre>{@code
+ * public class MyITCase {
+ *
+ *     {@code @Rule}
+ *     public StatefulFunctionsAppContainers myApp =
+ *         new StatefulFunctionsAppContainers("app-name", 3);
+ *
+ *     {@code @Test}
+ *     public void runTest() {
+ *         // the containers for the app, including master and workers, will already be running
+ *         // before the test is run; implement your test logic against the app
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p>In most cases you'd also need to start an additional system for the test, for example starting
+ * a container that runs Kafka from which the application depends on as an ingress or egress. The
+ * following demonstrates adding a Kafka container to the setup:
+ *
+ * <pre>{@code
+ * public class MyKafkaITCase {
+ *
+ *     {@code @Rule}
+ *     public KafkaContainer kafka = new KafkaContainer();
+ *
+ *     {@code @Rule}
+ *     public StatefulFunctionsAppContainers myApp =
+ *         new StatefulFunctionsAppContainers("app-name", 3)
+ *             .dependsOn(kafka);
+ *
+ *     ...
+ * }
+ * }</pre>
+ *
+ * <p>Application master and worker containers will always be started after containers that are
+ * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * depended on will also be setup such that they share the same network with the master and workers,
+ * so that they can freely communicate with each other.
+ *
+ * <h2>Prerequisites</h2>
+ *
+ * <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
+ * rule to work.
+ *
+ * <p>When building the Docker image for the Stateful Functions application under test, the
+ * following files are added to the build context:
+ *
+ * <uL>
+ *   <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
+ *       to be present. A simple way is to add the Dockerfile to the test resources directory. This
+ *       will be added to the root of the Docker image build context.
+ *   <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
+ *       any. You can also add this to the test resources directory. This will be added to the root
+ *       of the Docker image build context.
+ *   <li>All built artifacts under the generated {@code target} folder for the project module that
+ *       the test resides in. This is required to be present, so this entails that the tests can
+ *       only be ran after artifacts are built. The built artifacts are added to the root of the
+ *       Docker image build context.
+ * </uL>
+ */
+public final class StatefulFunctionsAppContainers extends ExternalResource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
+
+  private static final String MASTER_HOST = "statefun-app-master";
+  private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+  private final Network network;
+  private final GenericContainer master;
+  private final List<GenericContainer> workers;
+
+  public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+    if (appName == null || appName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "App name must be non-empty. This is used as the application image name.");
+    }
+    if (numWorkers < 1) {
+      throw new IllegalArgumentException("Must have at least 1 worker.");
+    }
+
+    this.network = Network.newNetwork();
+
+    final ImageFromDockerfile appImage = appImage(appName);
+    this.master = masterContainer(appImage, network);
+    this.workers = workerContainers(appImage, numWorkers, network);
+  }
+
+  public StatefulFunctionsAppContainers dependsOn(GenericContainer container) {
+    container.withNetwork(network);
+    master.dependsOn(container);
+    return this;
+  }
+
+  public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
+    master.withLogConsumer(new Slf4jLogConsumer(logger, true));
+    return this;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    master.start();
+    workers.forEach(GenericContainer::start);
+  }
+
+  @Override
+  protected void after() {
+    master.stop();
+    workers.forEach(GenericContainer::stop);
+  }
+
+  private static ImageFromDockerfile appImage(String appName) {
+    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+    final ImageFromDockerfile appImage =
+        new ImageFromDockerfile(appName)
+            .withFileFromClasspath("Dockerfile", "Dockerfile")
+            .withFileFromPath(".", targetDirPath);
+    if (flinkConfYamlExists()) {
+      appImage.withFileFromClasspath("flink-conf.yaml", "flink-conf.yaml");
+    }
+
+    return appImage;
+  }
+
+  private static boolean flinkConfYamlExists() {
+    final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
+    return flinkConfUrl != null;
+  }
+
+  private static GenericContainer masterContainer(ImageFromDockerfile appImage, Network network) {
+    return new GenericContainer(appImage)
+        .withNetwork(network)
+        .withNetworkAliases(MASTER_HOST)
+        .withEnv("ROLE", "master")
+        .withEnv("MASTER_HOST", MASTER_HOST);
+  }
+
+  private static List<GenericContainer> workerContainers(
+      ImageFromDockerfile appImage, int numWorkers, Network network) {
+    final List<GenericContainer> workers = new ArrayList<>(numWorkers);
+
+    for (int i = 0; i < numWorkers; i++) {
+      workers.add(
+          new GenericContainer(appImage)
+              .withNetwork(network)
+              .withNetworkAliases(workerHostOf(i))
+              .withEnv("ROLE", "worker")
+              .withEnv("MASTER_HOST", MASTER_HOST));
+    }
+
+    return workers;
+  }
+
+  private static String workerHostOf(int workerIndex) {
+    return WORKER_HOST_PREFIX + "-" + workerIndex;
+  }
+}