You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:47 UTC

[flink-statefun] 03/06: [FLINK-16272] [e2e] Introduce statefun-end-to-end-tests-common module

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 5ffa36ed9c1aca40109e9cf13c4a82b4605b3657
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 15:08:02 2020 +0800

    [FLINK-16272] [e2e] Introduce statefun-end-to-end-tests-common module
    
    All common end-to-end test utilities should be in this new module.
    Currently, it only has:
    * StatefulFunctionsAppContainers
    * KafkaIOVerifier
    * KafkaProtobufSerializer
---
 statefun-end-to-end-tests/pom.xml                  |   1 +
 .../statefun-end-to-end-tests-common/pom.xml       |  83 ++++++++
 .../common/StatefulFunctionsAppContainers.java     | 236 +++++++++++++++++++++
 .../itcases/common/kafka/KafkaIOVerifier.java      | 164 ++++++++++++++
 .../common/kafka/KafkaProtobufSerializer.java      |  61 ++++++
 5 files changed, 545 insertions(+)

diff --git a/statefun-end-to-end-tests/pom.xml b/statefun-end-to-end-tests/pom.xml
index 5a79d9f..2a4b0ce 100644
--- a/statefun-end-to-end-tests/pom.xml
+++ b/statefun-end-to-end-tests/pom.xml
@@ -29,6 +29,7 @@ under the License.
     <packaging>pom</packaging>
 
     <modules>
+        <module>statefun-end-to-end-tests-common</module>
         <module>statefun-sanity-itcase</module>
     </modules>
 
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml
new file mode 100644
index 0000000..9cf6798
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-end-to-end-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-end-to-end-tests-common</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+        <flink.version>1.10.0</flink.version>
+        <kafka.version>2.2.0</kafka.version>
+    </properties>
+
+    <dependencies>
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Testcontainers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Flink Config -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Kafka client -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java
new file mode 100644
index 0000000..b101fcd
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/StatefulFunctionsAppContainers.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+/**
+ * A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
+ * application using <a href="https://www.testcontainers.org/">Testcontainers</a>. This allows
+ * composing end-to-end tests for Stateful Functions applications easier, by managing the
+ * containerized application as an external test resource whose lifecycle is integrated with the
+ * JUnit test framework.
+ *
+ * <h2>Example usage</h2>
+ *
+ * <pre>{@code
+ * public class MyITCase {
+ *
+ *     {@code @Rule}
+ *     public StatefulFunctionsAppContainers myApp =
+ *         new StatefulFunctionsAppContainers("app-name", 3);
+ *
+ *     {@code @Test}
+ *     public void runTest() {
+ *         // the containers for the app, including master and workers, will already be running
+ *         // before the test is run; implement your test logic against the app
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p>In most cases you'd also need to start an additional system for the test, for example starting
+ * a container that runs Kafka from which the application depends on as an ingress or egress. The
+ * following demonstrates adding a Kafka container to the setup:
+ *
+ * <pre>{@code
+ * public class MyKafkaITCase {
+ *
+ *     {@code @Rule}
+ *     public KafkaContainer kafka = new KafkaContainer();
+ *
+ *     {@code @Rule}
+ *     public StatefulFunctionsAppContainers myApp =
+ *         new StatefulFunctionsAppContainers("app-name", 3)
+ *             .dependsOn(kafka);
+ *
+ *     ...
+ * }
+ * }</pre>
+ *
+ * <p>Application master and worker containers will always be started after containers that are
+ * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being
+ * depended on will also be setup such that they share the same network with the master and workers,
+ * so that they can freely communicate with each other.
+ *
+ * <h2>Prerequisites</h2>
+ *
+ * <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
+ * rule to work.
+ *
+ * <p>When building the Docker image for the Stateful Functions application under test, the
+ * following files are added to the build context:
+ *
+ * <uL>
+ *   <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
+ *       to be present. A simple way is to add the Dockerfile to the test resources directory. This
+ *       will be added to the root of the Docker image build context.
+ *   <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
+ *       any. You can also add this to the test resources directory. This will be added to the root
+ *       of the Docker image build context.
+ *   <li>All built artifacts under the generated {@code target} folder for the project module that
+ *       the test resides in. This is required to be present, so this entails that the tests can
+ *       only be ran after artifacts are built. The built artifacts are added to the root of the
+ *       Docker image build context.
+ * </uL>
+ */
+public final class StatefulFunctionsAppContainers extends ExternalResource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
+
+  private static final String MASTER_HOST = "statefun-app-master";
+  private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+  private final Network network;
+  private final GenericContainer<?> master;
+  private final List<GenericContainer<?>> workers;
+
+  public StatefulFunctionsAppContainers(String appName, int numWorkers) {
+    this(appName, numWorkers, null);
+  }
+
+  public StatefulFunctionsAppContainers(
+      String appName, int numWorkers, @Nullable Configuration dynamicProperties) {
+    if (appName == null || appName.isEmpty()) {
+      throw new IllegalArgumentException(
+          "App name must be non-empty. This is used as the application image name.");
+    }
+    if (numWorkers < 1) {
+      throw new IllegalArgumentException("Must have at least 1 worker.");
+    }
+
+    this.network = Network.newNetwork();
+
+    final ImageFromDockerfile appImage = appImage(appName, dynamicProperties);
+    this.master = masterContainer(appImage, network);
+    this.workers = workerContainers(appImage, numWorkers, network);
+  }
+
+  public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) {
+    container.withNetwork(network);
+    master.dependsOn(container);
+    return this;
+  }
+
+  public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
+    master.withLogConsumer(new Slf4jLogConsumer(logger, true));
+    return this;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    master.start();
+    workers.forEach(GenericContainer::start);
+  }
+
+  @Override
+  protected void after() {
+    master.stop();
+    workers.forEach(GenericContainer::stop);
+  }
+
+  private static ImageFromDockerfile appImage(
+      String appName, @Nullable Configuration dynamicProperties) {
+    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
+    LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
+
+    final ImageFromDockerfile appImage =
+        new ImageFromDockerfile(appName)
+            .withFileFromClasspath("Dockerfile", "Dockerfile")
+            .withFileFromPath(".", targetDirPath);
+
+    Configuration flinkConf = loadFlinkConfIfAvailable(dynamicProperties);
+    if (flinkConf != null) {
+      appImage.withFileFromString("flink-conf.yaml", flinkConfigAsString(flinkConf));
+    }
+
+    return appImage;
+  }
+
+  private static @Nullable Configuration loadFlinkConfIfAvailable(
+      @Nullable Configuration dynamicProperties) {
+    final URL flinkConfUrl = StatefulFunctionsAppContainers.class.getResource("/flink-conf.yaml");
+    if (flinkConfUrl == null) {
+      return dynamicProperties;
+    }
+
+    final String flinkConfDir;
+    try {
+      flinkConfDir = new File(flinkConfUrl.toURI()).getParentFile().getAbsolutePath();
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Failed to load flink-conf.yaml", e);
+    }
+
+    return GlobalConfiguration.loadConfiguration(flinkConfDir, dynamicProperties);
+  }
+
+  private static String flinkConfigAsString(Configuration configuration) {
+    StringBuilder yaml = new StringBuilder();
+    for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
+      yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
+    }
+
+    return yaml.toString();
+  }
+
+  private static GenericContainer<?> masterContainer(
+      ImageFromDockerfile appImage, Network network) {
+    return new GenericContainer(appImage)
+        .withNetwork(network)
+        .withNetworkAliases(MASTER_HOST)
+        .withEnv("ROLE", "master")
+        .withEnv("MASTER_HOST", MASTER_HOST);
+  }
+
+  private static List<GenericContainer<?>> workerContainers(
+      ImageFromDockerfile appImage, int numWorkers, Network network) {
+    final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
+
+    for (int i = 0; i < numWorkers; i++) {
+      workers.add(
+          new GenericContainer(appImage)
+              .withNetwork(network)
+              .withNetworkAliases(workerHostOf(i))
+              .withEnv("ROLE", "worker")
+              .withEnv("MASTER_HOST", MASTER_HOST));
+    }
+
+    return workers;
+  }
+
+  private static String workerHostOf(int workerIndex) {
+    return WORKER_HOST_PREFIX + "-" + workerIndex;
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java
new file mode 100644
index 0000000..764bada
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaIOVerifier.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common.kafka;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * A utility to make test assertions by means of writing inputs to Kafka, and then matching on
+ * outputs read from Kafka.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * KafkaProducer<String, Integer> producer = ...
+ * KafkaConsumer<String, Boolean> consumer = ...
+ *
+ * KafkaIOVerifier<String, Integer, String, Boolean> verifier =
+ *     new KafkaIOVerifier(producer, consumer);
+ *
+ * assertThat(
+ *     verifier.sending(
+ *         new ProducerRecord<>("topic", "key-1", 1991),
+ *         new ProducerRecord<>("topic", "key-2", 1108)
+ *     ), verifier.resultsInOrder(
+ *         true, false, true, true
+ *     )
+ * );
+ * }</pre>
+ *
+ * @param <PK> key type of input records written to Kafka
+ * @param <PV> value type of input records written to Kafka
+ * @param <CK> key type of output records read from Kafka
+ * @param <CV> value type of output records read from Kafka
+ */
+public final class KafkaIOVerifier<PK, PV, CK, CV> {
+
+  private final Producer<PK, PV> producer;
+  private final Consumer<CK, CV> consumer;
+
+  /**
+   * Creates a verifier.
+   *
+   * @param producer producer to use to write input records to Kafka.
+   * @param consumer consumer to use for reading output records from Kafka.
+   */
+  public KafkaIOVerifier(Producer<PK, PV> producer, Consumer<CK, CV> consumer) {
+    this.producer = Objects.requireNonNull(producer);
+    this.consumer = Objects.requireNonNull(consumer);
+  }
+
+  /**
+   * Writes to Kafka multiple assertion input producer records, in the given order.
+   *
+   * <p>The results of calling this method should be asserted using {@link
+   * #resultsInOrder(Matcher[])}. In the background, the provided Kafka consumer will be used to
+   * continuously poll output records. For each assertion input provided via this method, you must
+   * consequently use {@link #resultsInOrder(Matcher[])} to complete the assertion, which then stops
+   * the consumer from polling Kafka.
+   *
+   * @param assertionInputs assertion input producer records to send to Kafka.
+   * @return resulting outputs consumed from Kafka that can be asserted using {@link
+   *     #resultsInOrder(Matcher[])}.
+   */
+  @SafeVarargs
+  public final OutputsHandoff<CV> sending(ProducerRecord<PK, PV>... assertionInputs) {
+    CompletableFuture.runAsync(
+        () -> {
+          for (ProducerRecord<PK, PV> input : assertionInputs) {
+            producer.send(input);
+          }
+          producer.flush();
+        });
+
+    final OutputsHandoff<CV> outputsHandoff = new OutputsHandoff<>();
+
+    CompletableFuture.runAsync(
+        () -> {
+          while (!outputsHandoff.isVerified()) {
+            ConsumerRecords<CK, CV> consumerRecords = consumer.poll(Duration.ofMillis(100));
+            for (ConsumerRecord<CK, CV> record : consumerRecords) {
+              outputsHandoff.add(record.value());
+            }
+          }
+        });
+
+    return outputsHandoff;
+  }
+
+  /**
+   * Matcher for verifying the outputs as a result of calling {@link #sending(ProducerRecord[])}.
+   *
+   * @param expectedResults matchers for the expected results.
+   * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
+   */
+  @SafeVarargs
+  public final Matcher<OutputsHandoff<CV>> resultsInOrder(Matcher<CV>... expectedResults) {
+    return new TypeSafeMatcher<OutputsHandoff<CV>>() {
+      @Override
+      protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
+        try {
+          for (Matcher<CV> r : expectedResults) {
+            CV output = outputHandoff.take();
+            if (!r.matches(output)) {
+              return false;
+            }
+          }
+
+          // any dangling unexpected output should count as a mismatch
+          // TODO should we poll with timeout for a stronger verification?
+          return outputHandoff.peek() == null;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        } finally {
+          outputHandoff.verified();
+        }
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
+  private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private volatile boolean isVerified;
+
+    boolean isVerified() {
+      return isVerified;
+    }
+
+    void verified() {
+      this.isVerified = true;
+    }
+  }
+}
diff --git a/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java
new file mode 100644
index 0000000..7d4ffa9
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-end-to-end-tests-common/src/main/java/org/apache/flink/statefun/itcases/common/kafka/KafkaProtobufSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.common.kafka;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * A Kafka {@link Serializer} and {@link Deserializer} that uses Protobuf for serialization.
+ *
+ * @param <T> type of the Protobuf message.
+ */
+public final class KafkaProtobufSerializer<T extends Message>
+    implements Serializer<T>, Deserializer<T> {
+
+  private final Parser<T> parser;
+
+  public KafkaProtobufSerializer(Parser<T> parser) {
+    this.parser = Objects.requireNonNull(parser);
+  }
+
+  @Override
+  public byte[] serialize(String s, T command) {
+    return command.toByteArray();
+  }
+
+  @Override
+  public T deserialize(String s, byte[] bytes) {
+    try {
+      return parser.parseFrom(bytes);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {}
+}