You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:47 UTC
[flink-statefun] 03/06: [FLINK-16272] [e2e] Introduce
statefun-end-to-end-tests-common module
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 5ffa36ed9c1aca40109e9cf13c4a82b4605b3657
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 15:08:02 2020 +0800
[FLINK-16272] [e2e] Introduce statefun-end-to-end-tests-common module
All common end-to-end test utilities should be in this new module.
Currently, it only has:
* StatefulFunctionsAppContainers
* KafkaIOVerifier
* KafkaProtobufSerializer
---
statefun-end-to-end-tests/pom.xml | 1 +
.../statefun-end-to-end-tests-common/pom.xml | 83 ++++++++
.../common/StatefulFunctionsAppContainers.java | 236 +++++++++++++++++++++
.../itcases/common/kafka/KafkaIOVerifier.java | 164 ++++++++++++++
.../common/kafka/KafkaProtobufSerializer.java | 61 ++++++
5 files changed, 545 insertions(+)
diff --git a/statefun-end-to-end-tests/pom.xml b/statefun-end-to-end-tests/pom.xml
index 5a79d9f..2a4b0ce 100644
--- a/statefun-end-to-end-tests/pom.xml
+++ b/statefun-end-to-end-tests/pom.xml
@@ -29,6 +29,7 @@ under the License.
<packaging>pom</packaging>
<modules>
+ <module>statefun-end-to-end-tests-common</module>
<module>statefun-sanity-itcase</module>
</modules>
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml
new file mode 100644
index 0000000..9cf6798
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>statefun-end-to-end-tests-common</artifactId>
+
+ <properties>
+ <testcontainers.version>1.12.5</testcontainers.version>
+ <flink.version>1.10.0</flink.version>
+ <kafka.version>2.2.0</kafka.version>
+ </properties>
+
+ <dependencies>
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Testcontainers -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Flink Config -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Kafka client -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java
new file mode 100644
index 0000000..b101fcd
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
+ * application using <a href="https://www.testcontainers.org/">Testcontainers</a>. This allows
+ * composing end-to-end tests for Stateful Functions applications easier, by managing the
+ * containerized application as an external test resource whose lifecycle is integrated with the
+ * JUnit test framework.
+ *
+ * <h2>Example usage</h2>
+ *
+ * <pre>{@code
+ * public class MyITCase {
+ *
+ * {@code @Rule}
+ * public StatefulFunctionsAppContainers myApp =
+ * new StatefulFunctionsAppContainers("app-name", 3);
+ *
+ * {@code @Test}
+ * public void runTest() {
+ * // the containers for the app, including master and workers, will already be running
+ * // before the test is run; implement your test logic against the app
+ * }
+ * }
+ * }</pre>
+ *
+ * <p>In most cases you'd also need to start an additional system for the test, for example starting
+ * a container that runs Kafka from which the application depends on as an ingress or egress. The
+ * following demonstrates adding a Kafka container to the setup:
+ *
+ * <pre>{@code
+ * public class MyKafkaITCase {
+ *
+ * {@code @Rule}
+ * public KafkaContainer kafka = new KafkaContainer();
+ *
+ * {@code @Rule}
+ * public StatefulFunctionsAppContainers myApp =
+ * new StatefulFunctionsAppContainers("app-name", 3)
+ * .dependsOn(kafka);
+ *
+ * ...
+ * }
+ * }</pre>
+ *
+ * <p>Application master and worker containers will always be started after containers that are
+ * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * depended on will also be setup such that they share the same network with the master and workers,
+ * so that they can freely communicate with each other.
+ *
+ * <h2>Prerequisites</h2>
+ *
+ * <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
+ * rule to work.
+ *
+ * <p>When building the Docker image for the Stateful Functions application under test, the
+ * following files are added to the build context:
+ *
+ * <uL>
+ * <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
+ * to be present. A simple way is to add the Dockerfile to the test resources directory. This
+ * will be added to the root of the Docker image build context.
+ * <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
+ * any. You can also add this to the test resources directory. This will be added to the root
+ * of the Docker image build context.
+ * <li>All built artifacts under the generated {@code target} folder for the project module that
+ * the test resides in. This is required to be present, so this entails that the tests can
+ * only be ran after artifacts are built. The built artifacts are added to the root of the
+ * Docker image build context.
+ * </uL>
+ */
+public final class StatefulFunctionsAppContainers extends ExternalResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
+
+ private static final String MASTER_HOST = "statefun-app-master";
+ private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+ private final Network network;
+ private final GenericContainer<?> master;
+ private final List<GenericContainer<?>> workers;
+
+ public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+ this(appName, numWorkers, null);
+ }
+
+ public StatefulFunctionsAppContainers(
+ String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
+ if (appName == null || appName.isEmpty()) {
+ throw new IllegalArgumentException(
+ "App name must be non-empty. This is used as the application image name.");
+ }
+ if (numWorkers < 1) {
+ throw new IllegalArgumentException("Must have at least 1 worker.");
+ }
+
+ this.network = Network.newNetwork();
+
+ final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
+ this.master = masterContainer(appImage, network);
+ this.workers = workerContainers(appImage, numWorkers, network);
+ }
+
+ public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
+ container.withNetwork(network);
+ master.dependsOn(container);
+ return this;
+ }
+
+ public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
+ master.withLogConsumer(new Slf4jLogConsumer(logger, true));
+ return this;
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ master.start();
+ workers.forEach(GenericContainer::start);
+ }
+
+ @Override
+ protected void after() {
+ master.stop();
+ workers.forEach(GenericContainer::stop);
+ }
+
+ private static ImageFromDockerfile appImage(
+ String appName, @Nullable Configuration dynamicProperties) {
+ final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+ LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+ final ImageFromDockerfile appImage =
+ new ImageFromDockerfile(appName)
+ .withFileFromClasspath("Dockerfile", "Dockerfile")
+ .withFileFromPath(".", targetDirPath);
+
+ Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
+ if (flinkConf != null) {
+ appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
+ }
+
+ return appImage;
+ }
+
+ private static @Nullable Configuration loadFlinkConfIfAvailable(
+ @Nullable Configuration dynamicProperties) {
+ final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
+ if (flinkConfUrl == null) {
+ return dynamicProperties;
+ }
+
+ final String flinkConfDir;
+ try {
+ flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Failed to load flink-conf.yaml", e);
+ }
+
+ return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+ }
+
+ private static String flinkConfigAsString(Configuration configuration) {
+ StringBuilder yaml = new StringBuilder();
+ for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+ yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+ }
+
+ return yaml.toString();
+ }
+
+ private static GenericContainer<?> masterContainer(
+ ImageFromDockerfile appImage, Network network) {
+ return new GenericContainer(appImage)
+ .withNetwork(network)
+ .withNetworkAliases(MASTER_HOST)
+ .withEnv("ROLE", "master")
+ .withEnv("MASTER_HOST", MASTER_HOST);
+ }
+
+ private static List<GenericContainer<?>> workerContainers(
+ ImageFromDockerfile appImage, int numWorkers, Network network) {
+ final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
+
+ for (int i = 0; i < numWorkers; i++) {
+ workers.add(
+ new GenericContainer(appImage)
+ .withNetwork(network)
+ .withNetworkAliases(workerHostOf(i))
+ .withEnv("ROLE", "worker")
+ .withEnv("MASTER_HOST", MASTER_HOST));
+ }
+
+ return workers;
+ }
+
+ private static String workerHostOf(int workerIndex) {
+ return WORKER_HOST_PREFIX + "-" + workerIndex;
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java
new file mode 100644
index 0000000..764bada
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common.kafka;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * A utility to make test assertions by means of writing inputs to Kafka, and then matching on
+ * outputs read from Kafka.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * KafkaProducer<String, Integer> producer = ...
+ * KafkaConsumer<String, Boolean> consumer = ...
+ *
+ * KafkaIOVerifier<String, Integer, String, Boolean> verifier =
+ * new KafkaIOVerifier(producer, consumer);
+ *
+ * assertThat(
+ * verifier.sending(
+ * new ProducerRecord<>("topic", "key-1", 1991),
+ * new ProducerRecord<>("topic", "key-2", 1108)
+ * ), verifier.resultsInOrder(
+ * true, false, true, true
+ * )
+ * );
+ * }</pre>
+ *
+ * @param <PK> key type of input records written to Kafka
+ * @param <PV> value type of input records written to Kafka
+ * @param <CK> key type of output records read from Kafka
+ * @param <CV> value type of output records read from Kafka
+ */
+public final class KafkaIOVerifier<PK, PV, CK, CV> {
+
+ private final Producer<PK, PV> producer;
+ private final Consumer<CK, CV> consumer;
+
+ /**
+ * Creates a verifier.
+ *
+ * @param producer producer to use to write input records to Kafka.
+ * @param consumer consumer to use for reading output records from Kafka.
+ */
+ public KafkaIOVerifier(Producer<PK, PV> producer, Consumer<CK, CV> consumer) {
+ this.producer = Objects.requireNonNull(producer);
+ this.consumer = Objects.requireNonNull(consumer);
+ }
+
+ /**
+ * Writes to Kafka multiple assertion input producer records, in the given order.
+ *
+ * <p>The results of calling this method should be asserted using {@link
+ * #resultsInOrder(Matcher[])}. In the background, the provided Kafka consumer will be used to
+ * continuously poll output records. For each assertion input provided via this method, you must
+ * consequently use {@link #resultsInOrder(Matcher[])} to complete the assertion, which then stops
+ * the consumer from polling Kafka.
+ *
+ * @param assertionInputs assertion input producer records to send to Kafka.
+ * @return resulting outputs consumed from Kafka that can be asserted using {@link
+ * #resultsInOrder(Matcher[])}.
+ */
+ @SafeVarargs
+ public final OutputsHandoff<CV> sending(ProducerRecord<PK, PV>... assertionInputs) {
+ CompletableFuture.runAsync(
+ () -> {
+ for (ProducerRecord<PK, PV> input : assertionInputs) {
+ producer.send(input);
+ }
+ producer.flush();
+ });
+
+ final OutputsHandoff<CV> outputsHandoff = new OutputsHandoff<>();
+
+ CompletableFuture.runAsync(
+ () -> {
+ while (!outputsHandoff.isVerified()) {
+ ConsumerRecords<CK, CV> consumerRecords = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<CK, CV> record : consumerRecords) {
+ outputsHandoff.add(record.value());
+ }
+ }
+ });
+
+ return outputsHandoff;
+ }
+
+ /**
+ * Matcher for verifying the outputs as a result of calling {@link #sending(ProducerRecord[])}.
+ *
+ * @param expectedResults matchers for the expected results.
+ * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
+ */
+ @SafeVarargs
+ public final Matcher<OutputsHandoff<CV>> resultsInOrder(Matcher<CV>... expectedResults) {
+ return new TypeSafeMatcher<OutputsHandoff<CV>>() {
+ @Override
+ protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
+ try {
+ for (Matcher<CV> r : expectedResults) {
+ CV output = outputHandoff.take();
+ if (!r.matches(output)) {
+ return false;
+ }
+ }
+
+ // any dangling unexpected output should count as a mismatch
+ // TODO should we poll with timeout for a stronger verification?
+ return outputHandoff.peek() == null;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ outputHandoff.verified();
+ }
+ }
+
+ @Override
+ public void describeTo(Description description) {}
+ };
+ }
+
+ private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isVerified;
+
+ boolean isVerified() {
+ return isVerified;
+ }
+
+ void verified() {
+ this.isVerified = true;
+ }
+ }
+}
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java
new file mode 100644
index 0000000..7d4ffa9
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common.kafka;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * A Kafka {@link Serializer} and {@link Deserializer} that uses Protobuf for serialization.
+ *
+ * @param <T> type of the Protobuf message.
+ */
+public final class KafkaProtobufSerializer<T extends Message>
+ implements Serializer<T>, Deserializer<T> {
+
+ private final Parser<T> parser;
+
+ public KafkaProtobufSerializer(Parser<T> parser) {
+ this.parser = Objects.requireNonNull(parser);
+ }
+
+ @Override
+ public byte[] serialize(String s, T command) {
+ return command.toByteArray();
+ }
+
+ @Override
+ public T deserialize(String s, byte[] bytes) {
+ try {
+ return parser.parseFrom(bytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {}
+}