You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/24 10:03:39 UTC

[flink-statefun] 04/06: [FLINK-16159] [tests] Introduce KafkaIOVerifier

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 96b7a90993e54d17af07cc00e51c7e951f755bb3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Feb 24 16:36:00 2020 +0800

    [FLINK-16159] [tests] Introduce KafkaIOVerifier
    
    A utility to make end-to-end test assertions by means of writing inputs
    to Kafka, and then matching on outputs read from Kafka.
---
 .../sanity/testutils/kafka/KafkaIOVerifier.java    | 161 +++++++++++++++++++++
 1 file changed, 161 insertions(+)

diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
new file mode 100644
index 0000000..899685c
--- /dev/null
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.itcases.sanity.testutils.kafka;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * A utility to make test assertions by means of writing inputs to Kafka, and then matching on
+ * outputs read from Kafka.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * KafkaProducer<String, Integer> producer = ...
+ * KafkaConsumer<String, Boolean> consumer = ...
+ *
+ * KafkaIOVerifier<String, Integer, String, Boolean> verifier =
+ *     new KafkaIOVerifier(producer, consumer);
+ *
+ * assertThat(
+ *     verifier.sending(
+ *         new ProducerRecord<>("topic", "key-1", 1991),
+ *         new ProducerRecord<>("topic", "key-2", 1108)
+ *     ), verifier.resultsInOrder(
+ *         true, false, true, true
+ *     )
+ * );
+ * }</pre>
+ *
+ * @param <PK> key type of input records written to Kafka
+ * @param <PV> value type of input records written to Kafka
+ * @param <CK> key type of output records read from Kafka
+ * @param <CV> value type of output records read from Kafka
+ */
+public final class KafkaIOVerifier<PK, PV, CK, CV> {
+
+  private final Producer<PK, PV> producer;
+  private final Consumer<CK, CV> consumer;
+
+  /**
+   * Creates a verifier.
+   *
+   * @param producer producer to use to write input records to Kafka.
+   * @param consumer consumer to use for reading output records from Kafka.
+   */
+  public KafkaIOVerifier(Producer<PK, PV> producer, Consumer<CK, CV> consumer) {
+    this.producer = Objects.requireNonNull(producer);
+    this.consumer = Objects.requireNonNull(consumer);
+  }
+
+  /**
+   * Writes to Kafka multiple assertion input producer records, in the given order.
+   *
+   * <p>The results of calling this method should be asserted using {@link
+   * #resultsInOrder(Matcher[])}. In the background, the provided Kafka consumer will be used to
+   * continuously poll output records. For each assertion input provided via this method, you must
+   * consequently use {@link #resultsInOrder(Matcher[])} to complete the assertion, which then stops
+   * the consumer from polling Kafka.
+   *
+   * @param assertionInputs assertion input producer records to send to Kafka.
+   * @return resulting outputs consumed from Kafka that can be asserted using {@link
+   *     #resultsInOrder(Matcher[])}.
+   */
+  @SafeVarargs
+  public final OutputsHandoff<CV> sending(ProducerRecord<PK, PV>... assertionInputs) {
+    CompletableFuture.runAsync(
+        () -> {
+          for (ProducerRecord<PK, PV> input : assertionInputs) {
+            producer.send(input);
+          }
+          producer.flush();
+        });
+
+    final OutputsHandoff<CV> outputsHandoff = new OutputsHandoff<>();
+
+    CompletableFuture.runAsync(
+        () -> {
+          while (!outputsHandoff.isVerified()) {
+            ConsumerRecords<CK, CV> consumerRecords = consumer.poll(Duration.ofMillis(100));
+            for (ConsumerRecord<CK, CV> record : consumerRecords) {
+              outputsHandoff.add(record.value());
+            }
+          }
+        });
+
+    return outputsHandoff;
+  }
+
+  /**
+   * Matcher for verifying the outputs as a result of calling {@link #sending(ProducerRecord[])}.
+   *
+   * @param expectedResults matchers for the expected results.
+   * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
+   */
+  @SafeVarargs
+  public final Matcher<OutputsHandoff<CV>> resultsInOrder(Matcher<CV>... expectedResults) {
+    return new TypeSafeMatcher<OutputsHandoff<CV>>() {
+      @Override
+      protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
+        try {
+          for (Matcher<CV> r : expectedResults) {
+            CV output = outputHandoff.take();
+            if (!r.matches(output)) {
+              return false;
+            }
+          }
+
+          // any dangling unexpected output should count as a mismatch
+          // TODO should we poll with timeout for a stronger verification?
+          return outputHandoff.peek() == null;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        } finally {
+          outputHandoff.verified();
+        }
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
+  private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
+    private volatile boolean isVerified;
+
+    boolean isVerified() {
+      return isVerified;
+    }
+
+    void verified() {
+      this.isVerified = true;
+    }
+  }
+}