You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:48 UTC

[flink-statefun] 04/06: [FLINK-16272] [e2e] Use e2e common module in sanity verification test

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 78117b8d96c0ad9b27bf971d3a472c750d84a1d6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 15:10:27 2020 +0800

    [FLINK-16272] [e2e] Use e2e common module in sanity verification test
    
    This removes all classes and dependencies that are now provided by the
    e2e commons module from the sanity verification test.
---
 .../statefun-sanity-itcase/pom.xml                 |  20 +--
 .../itcases/sanity/SanityVerificationITCase.java   |  51 ++-----
 .../sanity/testutils/kafka/KafkaIOVerifier.java    | 161 ---------------------
 3 files changed, 15 insertions(+), 217 deletions(-)

diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
index ab5d842..f670411 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
@@ -66,23 +66,15 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-        <!-- Testcontainers -->
+        <!-- End-to-end test common -->
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
-            <version>${testcontainers.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-end-to-end-tests-common</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-compress</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>kafka</artifactId>
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
index d383fe8..0d72b29 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
@@ -21,28 +21,23 @@ package org.apache.flink.statefun.itcases.sanity;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
 import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.itcases.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.itcases.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.itcases.common.kafka.KafkaProtobufSerializer;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
 import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.flink.statefun.itcases.sanity.testutils.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.itcases.sanity.testutils.kafka.KafkaIOVerifier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -106,7 +101,7 @@ public class SanityVerificationITCase {
   }
 
   // =================================================================================
-  //  Kafka IO utility classes and methods
+  //  Kafka IO utility methods
   // =================================================================================
 
   private static Producer<FnAddress, Command> kafkaCommandProducer(String bootstrapServers) {
@@ -114,7 +109,9 @@ public class SanityVerificationITCase {
     props.put("bootstrap.servers", bootstrapServers);
 
     return new KafkaProducer<>(
-        props, new ProtobufSerDe<>(FnAddress.parser()), new ProtobufSerDe<>(Command.parser()));
+        props,
+        new KafkaProtobufSerializer<>(FnAddress.parser()),
+        new KafkaProtobufSerializer<>(Command.parser()));
   }
 
   private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
@@ -127,43 +124,13 @@ public class SanityVerificationITCase {
     KafkaConsumer<FnAddress, StateSnapshot> consumer =
         new KafkaConsumer<>(
             consumerProps,
-            new ProtobufSerDe<>(FnAddress.parser()),
-            new ProtobufSerDe<>(StateSnapshot.parser()));
+            new KafkaProtobufSerializer<>(FnAddress.parser()),
+            new KafkaProtobufSerializer<>(StateSnapshot.parser()));
     consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
 
     return consumer;
   }
 
-  private static final class ProtobufSerDe<T extends Message>
-      implements Serializer<T>, Deserializer<T> {
-
-    private final Parser<T> parser;
-
-    ProtobufSerDe(Parser<T> parser) {
-      this.parser = Objects.requireNonNull(parser);
-    }
-
-    @Override
-    public byte[] serialize(String s, T command) {
-      return command.toByteArray();
-    }
-
-    @Override
-    public T deserialize(String s, byte[] bytes) {
-      try {
-        return parser.parseFrom(bytes);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void close() {}
-
-    @Override
-    public void configure(Map<String, ?> map, boolean b) {}
-  }
-
   private static ProducerRecord<FnAddress, Command> producerRecord(Command command) {
     return new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, command.getTarget(), command);
   }
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
deleted file mode 100644
index 899685c..0000000
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.itcases.sanity.testutils.kafka;
-
-import java.time.Duration;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-
-/**
- * A utility to make test assertions by means of writing inputs to Kafka, and then matching on
- * outputs read from Kafka.
- *
- * <p>Example usage:
- *
- * <pre>{@code
- * KafkaProducer<String, Integer> producer = ...
- * KafkaConsumer<String, Boolean> consumer = ...
- *
- * KafkaIOVerifier<String, Integer, String, Boolean> verifier =
- *     new KafkaIOVerifier(producer, consumer);
- *
- * assertThat(
- *     verifier.sending(
- *         new ProducerRecord<>("topic", "key-1", 1991),
- *         new ProducerRecord<>("topic", "key-2", 1108)
- *     ), verifier.resultsInOrder(
- *         true, false, true, true
- *     )
- * );
- * }</pre>
- *
- * @param <PK> key type of input records written to Kafka
- * @param <PV> value type of input records written to Kafka
- * @param <CK> key type of output records read from Kafka
- * @param <CV> value type of output records read from Kafka
- */
-public final class KafkaIOVerifier<PK, PV, CK, CV> {
-
-  private final Producer<PK, PV> producer;
-  private final Consumer<CK, CV> consumer;
-
-  /**
-   * Creates a verifier.
-   *
-   * @param producer producer to use to write input records to Kafka.
-   * @param consumer consumer to use for reading output records from Kafka.
-   */
-  public KafkaIOVerifier(Producer<PK, PV> producer, Consumer<CK, CV> consumer) {
-    this.producer = Objects.requireNonNull(producer);
-    this.consumer = Objects.requireNonNull(consumer);
-  }
-
-  /**
-   * Writes to Kafka multiple assertion input producer records, in the given order.
-   *
-   * <p>The results of calling this method should be asserted using {@link
-   * #resultsInOrder(Matcher[])}. In the background, the provided Kafka consumer will be used to
-   * continuously poll output records. For each assertion input provided via this method, you must
-   * consequently use {@link #resultsInOrder(Matcher[])} to complete the assertion, which then stops
-   * the consumer from polling Kafka.
-   *
-   * @param assertionInputs assertion input producer records to send to Kafka.
-   * @return resulting outputs consumed from Kafka that can be asserted using {@link
-   *     #resultsInOrder(Matcher[])}.
-   */
-  @SafeVarargs
-  public final OutputsHandoff<CV> sending(ProducerRecord<PK, PV>... assertionInputs) {
-    CompletableFuture.runAsync(
-        () -> {
-          for (ProducerRecord<PK, PV> input : assertionInputs) {
-            producer.send(input);
-          }
-          producer.flush();
-        });
-
-    final OutputsHandoff<CV> outputsHandoff = new OutputsHandoff<>();
-
-    CompletableFuture.runAsync(
-        () -> {
-          while (!outputsHandoff.isVerified()) {
-            ConsumerRecords<CK, CV> consumerRecords = consumer.poll(Duration.ofMillis(100));
-            for (ConsumerRecord<CK, CV> record : consumerRecords) {
-              outputsHandoff.add(record.value());
-            }
-          }
-        });
-
-    return outputsHandoff;
-  }
-
-  /**
-   * Matcher for verifying the outputs as a result of calling {@link #sending(ProducerRecord[])}.
-   *
-   * @param expectedResults matchers for the expected results.
-   * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
-   */
-  @SafeVarargs
-  public final Matcher<OutputsHandoff<CV>> resultsInOrder(Matcher<CV>... expectedResults) {
-    return new TypeSafeMatcher<OutputsHandoff<CV>>() {
-      @Override
-      protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
-        try {
-          for (Matcher<CV> r : expectedResults) {
-            CV output = outputHandoff.take();
-            if (!r.matches(output)) {
-              return false;
-            }
-          }
-
-          // any dangling unexpected output should count as a mismatch
-          // TODO should we poll with timeout for a stronger verification?
-          return outputHandoff.peek() == null;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        } finally {
-          outputHandoff.verified();
-        }
-      }
-
-      @Override
-      public void describeTo(Description description) {}
-    };
-  }
-
-  private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
-    private volatile boolean isVerified;
-
-    boolean isVerified() {
-      return isVerified;
-    }
-
-    void verified() {
-      this.isVerified = true;
-    }
-  }
-}