You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/26 07:55:48 UTC
[flink-statefun] 04/06: [FLINK-16272] [e2e] Use e2e common module
in sanity verification test
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 78117b8d96c0ad9b27bf971d3a472c750d84a1d6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Feb 25 15:10:27 2020 +0800
[FLINK-16272] [e2e] Use e2e common module in sanity verification test
This removes all classes and dependencies that are now provided by the
e2e commons module from the sanity verification test.
---
.../statefun-sanity-itcase/pom.xml | 20 +--
.../itcases/sanity/SanityVerificationITCase.java | 51 ++-----
.../sanity/testutils/kafka/KafkaIOVerifier.java | 161 ---------------------
3 files changed, 15 insertions(+), 217 deletions(-)
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
index ab5d842..f670411 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/pom.xml
@@ -66,23 +66,15 @@ under the License.
<scope>test</scope>
</dependency>
- <!-- Testcontainers -->
+ <!-- End-to-end test common -->
<dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
- <version>${testcontainers.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-end-to-end-tests-common</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- </exclusions>
</dependency>
+
+ <!-- Testcontainers KafkaContainer -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
index d383fe8..0d72b29 100644
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
+++ b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/SanityVerificationITCase.java
@@ -21,28 +21,23 @@ package org.apache.flink.statefun.itcases.sanity;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statefun.itcases.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.itcases.common.kafka.KafkaIOVerifier;
+import org.apache.flink.statefun.itcases.common.kafka.KafkaProtobufSerializer;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Command;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.FnAddress;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Modify;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Noop;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.Send;
import org.apache.flink.statefun.itcases.sanity.generated.VerificationMessages.StateSnapshot;
-import org.apache.flink.statefun.itcases.sanity.testutils.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.itcases.sanity.testutils.kafka.KafkaIOVerifier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
@@ -106,7 +101,7 @@ public class SanityVerificationITCase {
}
// =================================================================================
- // Kafka IO utility classes and methods
+ // Kafka IO utility methods
// =================================================================================
private static Producer<FnAddress, Command> kafkaCommandProducer(String bootstrapServers) {
@@ -114,7 +109,9 @@ public class SanityVerificationITCase {
props.put("bootstrap.servers", bootstrapServers);
return new KafkaProducer<>(
- props, new ProtobufSerDe<>(FnAddress.parser()), new ProtobufSerDe<>(Command.parser()));
+ props,
+ new KafkaProtobufSerializer<>(FnAddress.parser()),
+ new KafkaProtobufSerializer<>(Command.parser()));
}
private static Consumer<FnAddress, StateSnapshot> kafkaStateSnapshotConsumer(
@@ -127,43 +124,13 @@ public class SanityVerificationITCase {
KafkaConsumer<FnAddress, StateSnapshot> consumer =
new KafkaConsumer<>(
consumerProps,
- new ProtobufSerDe<>(FnAddress.parser()),
- new ProtobufSerDe<>(StateSnapshot.parser()));
+ new KafkaProtobufSerializer<>(FnAddress.parser()),
+ new KafkaProtobufSerializer<>(StateSnapshot.parser()));
consumer.subscribe(Collections.singletonList(KafkaIO.STATE_SNAPSHOTS_TOPIC_NAME));
return consumer;
}
- private static final class ProtobufSerDe<T extends Message>
- implements Serializer<T>, Deserializer<T> {
-
- private final Parser<T> parser;
-
- ProtobufSerDe(Parser<T> parser) {
- this.parser = Objects.requireNonNull(parser);
- }
-
- @Override
- public byte[] serialize(String s, T command) {
- return command.toByteArray();
- }
-
- @Override
- public T deserialize(String s, byte[] bytes) {
- try {
- return parser.parseFrom(bytes);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {}
-
- @Override
- public void configure(Map<String, ?> map, boolean b) {}
- }
-
private static ProducerRecord<FnAddress, Command> producerRecord(Command command) {
return new ProducerRecord<>(KafkaIO.COMMAND_TOPIC_NAME, command.getTarget(), command);
}
diff --git a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java b/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
deleted file mode 100644
index 899685c..0000000
--- a/statefun-end-to-end-tests/statefun-sanity-itcase/src/test/java/org/apache/flink/statefun/itcases/sanity/testutils/kafka/KafkaIOVerifier.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.itcases.sanity.testutils.kafka;
-
-import java.time.Duration;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-
-/**
- * A utility to make test assertions by means of writing inputs to Kafka, and then matching on
- * outputs read from Kafka.
- *
- * <p>Example usage:
- *
- * <pre>{@code
- * KafkaProducer<String, Integer> producer = ...
- * KafkaConsumer<String, Boolean> consumer = ...
- *
- * KafkaIOVerifier<String, Integer, String, Boolean> verifier =
- * new KafkaIOVerifier(producer, consumer);
- *
- * assertThat(
- * verifier.sending(
- * new ProducerRecord<>("topic", "key-1", 1991),
- * new ProducerRecord<>("topic", "key-2", 1108)
- * ), verifier.resultsInOrder(
- * true, false, true, true
- * )
- * );
- * }</pre>
- *
- * @param <PK> key type of input records written to Kafka
- * @param <PV> value type of input records written to Kafka
- * @param <CK> key type of output records read from Kafka
- * @param <CV> value type of output records read from Kafka
- */
-public final class KafkaIOVerifier<PK, PV, CK, CV> {
-
- private final Producer<PK, PV> producer;
- private final Consumer<CK, CV> consumer;
-
- /**
- * Creates a verifier.
- *
- * @param producer producer to use to write input records to Kafka.
- * @param consumer consumer to use for reading output records from Kafka.
- */
- public KafkaIOVerifier(Producer<PK, PV> producer, Consumer<CK, CV> consumer) {
- this.producer = Objects.requireNonNull(producer);
- this.consumer = Objects.requireNonNull(consumer);
- }
-
- /**
- * Writes to Kafka multiple assertion input producer records, in the given order.
- *
- * <p>The results of calling this method should be asserted using {@link
- * #resultsInOrder(Matcher[])}. In the background, the provided Kafka consumer will be used to
- * continuously poll output records. For each assertion input provided via this method, you must
- * consequently use {@link #resultsInOrder(Matcher[])} to complete the assertion, which then stops
- * the consumer from polling Kafka.
- *
- * @param assertionInputs assertion input producer records to send to Kafka.
- * @return resulting outputs consumed from Kafka that can be asserted using {@link
- * #resultsInOrder(Matcher[])}.
- */
- @SafeVarargs
- public final OutputsHandoff<CV> sending(ProducerRecord<PK, PV>... assertionInputs) {
- CompletableFuture.runAsync(
- () -> {
- for (ProducerRecord<PK, PV> input : assertionInputs) {
- producer.send(input);
- }
- producer.flush();
- });
-
- final OutputsHandoff<CV> outputsHandoff = new OutputsHandoff<>();
-
- CompletableFuture.runAsync(
- () -> {
- while (!outputsHandoff.isVerified()) {
- ConsumerRecords<CK, CV> consumerRecords = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<CK, CV> record : consumerRecords) {
- outputsHandoff.add(record.value());
- }
- }
- });
-
- return outputsHandoff;
- }
-
- /**
- * Matcher for verifying the outputs as a result of calling {@link #sending(ProducerRecord[])}.
- *
- * @param expectedResults matchers for the expected results.
- * @return a matcher for verifying the output of calling {@link #sending(ProducerRecord[])}.
- */
- @SafeVarargs
- public final Matcher<OutputsHandoff<CV>> resultsInOrder(Matcher<CV>... expectedResults) {
- return new TypeSafeMatcher<OutputsHandoff<CV>>() {
- @Override
- protected boolean matchesSafely(OutputsHandoff<CV> outputHandoff) {
- try {
- for (Matcher<CV> r : expectedResults) {
- CV output = outputHandoff.take();
- if (!r.matches(output)) {
- return false;
- }
- }
-
- // any dangling unexpected output should count as a mismatch
- // TODO should we poll with timeout for a stronger verification?
- return outputHandoff.peek() == null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- outputHandoff.verified();
- }
- }
-
- @Override
- public void describeTo(Description description) {}
- };
- }
-
- private static final class OutputsHandoff<T> extends LinkedBlockingQueue<T> {
- private volatile boolean isVerified;
-
- boolean isVerified() {
- return isVerified;
- }
-
- void verified() {
- this.isVerified = true;
- }
- }
-}