You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/02/16 01:31:56 UTC
[flink] 01/02: [FLINK-25289][tests] Introduce sink test suite in connector test framework
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57e3f03ccd719ed772c983ba335517d95f8f3e6a
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Tue Feb 15 20:15:21 2022 +0800
[FLINK-25289][tests] Introduce sink test suite in connector test framework
This closes #18496.
---
.../connector/kafka/sink/KafkaSinkITCase.java | 46 ++
.../kafka/sink/testutils/KafkaDataReader.java | 69 +++
.../sink/testutils/KafkaSinkExternalContext.java | 272 +++++++++
.../testutils/KafkaSinkExternalContextFactory.java | 53 ++
.../flink-end-to-end-tests-common-kafka/pom.xml | 10 +-
.../flink/tests/util/kafka/KafkaSinkE2ECase.java | 87 +++
.../flink-connector-test-utils/pom.xml | 26 +
.../environment/MiniClusterTestEnvironment.java | 22 +-
.../sink/DataStreamSinkExternalContext.java | 10 -
...t.java => DataStreamSinkV1ExternalContext.java} | 29 +-
.../sink/DataStreamSinkV2ExternalContext.java | 38 ++
.../testframe/source/FromElementsSource.java | 103 ++++
.../testframe/source/FromElementsSourceReader.java | 134 +++++
.../testframe/source/enumerator/NoOpEnumState.java | 22 +
.../source/enumerator/NoOpEnumStateSerializer.java | 41 ++
.../source/enumerator/NoOpEnumerator.java | 50 ++
.../testframe/source/split/FromElementsSplit.java | 46 ++
.../source/split/FromElementsSplitSerializer.java | 55 ++
.../testframe/testsuites/SinkTestSuiteBase.java | 643 +++++++++++++++++++++
.../testframe/testsuites/SourceTestSuiteBase.java | 15 +-
.../connector/testframe/utils/MetricQuerier.java | 43 +-
21 files changed, 1744 insertions(+), 70 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 2a35112..4234f7a 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -31,12 +31,23 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -48,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
@@ -69,11 +81,15 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.TestTemplate;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
import javax.annotation.Nullable;
@@ -161,6 +177,36 @@ public class KafkaSinkITCase extends TestLogger {
deleteTestTopic(topic);
}
+ /** Integration test based on connector testing framework. */
+ @Nested
+ class IntegrationTests extends SinkTestSuiteBase<String> {
+ // Defines test environment on Flink MiniCluster
+ @SuppressWarnings("unused")
+ @TestEnv
+ MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+ // Defines external system
+ @TestExternalSystem
+ DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+ DefaultContainerizedExternalSystem.builder()
+ .fromContainer(
+ new KafkaContainer(
+ DockerImageName.parse(DockerImageVersions.KAFKA)))
+ .build();
+
+ @SuppressWarnings("unused")
+ @TestSemantics
+ CheckpointingMode[] semantics =
+ new CheckpointingMode[] {
+ CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+ };
+
+ @SuppressWarnings("unused")
+ @TestContext
+ KafkaSinkExternalContextFactory sinkContext =
+ new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
+ }
+
@Test
public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception {
writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
new file mode 100644
index 0000000..0847cd4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/** Kafka dataStream data reader. */
+public class KafkaDataReader implements ExternalSystemDataReader<String> {
+ private final KafkaConsumer<String, String> consumer;
+
+ public KafkaDataReader(Properties properties, Collection<TopicPartition> partitions) {
+ this.consumer = new KafkaConsumer<>(properties);
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+ }
+
+ @Override
+ public List<String> poll(Duration timeout) {
+ List<String> result = new LinkedList<>();
+ ConsumerRecords<String, String> consumerRecords;
+ try {
+ consumerRecords = consumer.poll(timeout);
+ } catch (WakeupException we) {
+ return Collections.emptyList();
+ }
+ Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
+ while (iterator.hasNext()) {
+ result.add(iterator.next().value());
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
new file mode 100644
index 0000000..7c287d2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+
+/** A Kafka external context that will create only one topic and use partitions in that topic. */
+public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
+
+ private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
+ private static final long DEFAULT_TIMEOUT = 30L;
+ private static final int RANDOM_STRING_MAX_LENGTH = 50;
+ private static final int NUM_RECORDS_UPPER_BOUND = 500;
+ private static final int NUM_RECORDS_LOWER_BOUND = 100;
+ private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000;
+
+ protected String bootstrapServers;
+ protected final String topicName;
+
+ private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>();
+
+ protected int numSplits = 0;
+
+ private List<URL> connectorJarPaths;
+
+ protected final AdminClient kafkaAdminClient;
+
+ public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) {
+ this.bootstrapServers = bootstrapServers;
+ this.connectorJarPaths = connectorJarPaths;
+ this.topicName =
+ TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+ kafkaAdminClient = createAdminClient();
+ }
+
+ private void createTopic(String topicName, int numPartitions, short replicationFactor) {
+ LOG.debug(
+ "Creating new Kafka topic {} with {} partitions and {} replicas",
+ topicName,
+ numPartitions,
+ replicationFactor);
+ NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
+ try {
+ kafkaAdminClient
+ .createTopics(Collections.singletonList(newTopic))
+ .all()
+ .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
+ }
+ }
+
+ private void deleteTopic(String topicName) {
+ LOG.debug("Deleting Kafka topic {}", topicName);
+ try {
+ kafkaAdminClient
+ .deleteTopics(Collections.singletonList(topicName))
+ .all()
+ .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
+ throw new RuntimeException(
+ String.format("Cannot delete unknown Kafka topic '%s'", topicName), e);
+ }
+ }
+ }
+
+ private AdminClient createAdminClient() {
+ final Properties config = new Properties();
+ config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ return AdminClient.create(config);
+ }
+
+ @Override
+ public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+ if (!topicExists(topicName)) {
+ createTopic(topicName, 4, (short) 1);
+ }
+
+ KafkaSinkBuilder<String> builder = KafkaSink.builder();
+ final Properties properties = new Properties();
+ properties.put(
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT_IN_MS);
+ builder.setBootstrapServers(bootstrapServers)
+ .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode()))
+ .setTransactionalIdPrefix("testingFramework")
+ .setKafkaProducerConfig(properties)
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(topicName)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .build());
+ return builder.build();
+ }
+
+ @Override
+ public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+ LOG.info("Fetching information for topic: {}", topicName);
+ final Map<String, TopicDescription> topicMetadata =
+ getTopicMetadata(Arrays.asList(topicName));
+
+ Set<TopicPartition> subscribedPartitions = new HashSet<>();
+ for (TopicDescription topic : topicMetadata.values()) {
+ for (TopicPartitionInfo partition : topic.partitions()) {
+ subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
+ }
+ }
+
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "flink-kafka-test" + subscribedPartitions.hashCode());
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ properties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ if (EXACTLY_ONCE.equals(sinkSettings.getCheckpointingMode())) {
+ // default is read_uncommitted
+ properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ }
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ readers.add(new KafkaDataReader(properties, subscribedPartitions));
+ return readers.get(readers.size() - 1);
+ }
+
+ @Override
+ public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+ Random random = new Random(seed);
+ List<String> randomStringRecords = new ArrayList<>();
+ int recordNum =
+ random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+ + NUM_RECORDS_LOWER_BOUND;
+ for (int i = 0; i < recordNum; i++) {
+ int stringLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+ randomStringRecords.add(RandomStringUtils.random(stringLength, true, true));
+ }
+ return randomStringRecords;
+ }
+
+ protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
+ try {
+ return kafkaAdminClient.describeTopics(topics).all().get();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to get metadata for topics %s.", topics), e);
+ }
+ }
+
+ private boolean topicExists(String topic) {
+ try {
+ kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (numSplits != 0) {
+ deleteTopic(topicName);
+ }
+ readers.stream()
+ .filter(Objects::nonNull)
+ .forEach(
+ reader -> {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ if (kafkaAdminClient != null) {
+ kafkaAdminClient.close();
+ }
+ throw new RuntimeException("Cannot close split writer", e);
+ }
+ });
+ readers.clear();
+ if (kafkaAdminClient != null) {
+ kafkaAdminClient.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Single-topic Kafka";
+ }
+
+ @Override
+ public List<URL> getConnectorJarPaths() {
+ return connectorJarPaths;
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeInformation.of(String.class);
+ }
+
+ private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
+ switch (checkpointingMode) {
+ case EXACTLY_ONCE:
+ return DeliveryGuarantee.EXACTLY_ONCE;
+ case AT_LEAST_ONCE:
+ return DeliveryGuarantee.AT_LEAST_ONCE;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.",
+ checkpointingMode));
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
new file mode 100644
index 0000000..b795854
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.KafkaContainer;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Kafka sink external context factory. */
+public class KafkaSinkExternalContextFactory
+ implements ExternalContextFactory<KafkaSinkExternalContext> {
+
+ private final KafkaContainer kafkaContainer;
+ private final List<URL> connectorJars;
+
+ public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, List<URL> connectorJars) {
+ this.kafkaContainer = kafkaContainer;
+ this.connectorJars = connectorJars;
+ }
+
+ private String getBootstrapServer() {
+ final String internalEndpoints =
+ kafkaContainer.getNetworkAliases().stream()
+ .map(host -> String.join(":", host, Integer.toString(9092)))
+ .collect(Collectors.joining(","));
+ return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints);
+ }
+
+ @Override
+ public KafkaSinkExternalContext createExternalContext(String testName) {
+ return new KafkaSinkExternalContext(getBootstrapServer(), connectorJars);
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index d37d55d..ec7bd0c 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -150,7 +150,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
@@ -225,6 +224,15 @@ under the License.
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <classifier>source</classifier>
+ <destFileName>flink-connector-testing.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
new file mode 100644
index 0000000..520491e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** Kafka sink E2E test based on connector testing framework. */
+@SuppressWarnings("unused")
+public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
+ private static final String KAFKA_HOSTNAME = "kafka";
+
+ @TestSemantics
+ CheckpointingMode[] semantics =
+ new CheckpointingMode[] {
+ CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+ };
+
+ // Defines TestEnvironment
+ @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+ // Defines ConnectorExternalSystem
+ @TestExternalSystem
+ DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+ DefaultContainerizedExternalSystem.builder()
+ .fromContainer(
+ new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+ .withNetworkAliases(KAFKA_HOSTNAME))
+ .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+ .build();
+
+ // Defines 2 External context Factories, so test cases will be invoked twice using these two
+ // kinds of external contexts.
+ @TestContext
+ KafkaSinkExternalContextFactory contextFactory =
+ new KafkaSinkExternalContextFactory(
+ kafka.getContainer(),
+ Arrays.asList(
+ TestUtils.getResource("kafka-connector.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource("kafka-clients.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL(),
+ TestUtils.getResource("flink-connector-testing.jar")
+ .toAbsolutePath()
+ .toUri()
+ .toURL()));
+
+ public KafkaSinkE2ECase() throws Exception {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/pom.xml b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
index 13aeea7..b3c1067 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
@@ -95,4 +95,30 @@
<scope>compile</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>source</shadedClassifierName>
+ <artifactSet>
+ <includes>
+ <include>**/connector/testframe/source/**</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
index c5f29d1..0ecbe7f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
@@ -30,17 +30,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -161,7 +160,7 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
}
isStarted = false;
this.miniCluster.after();
- deletePath(checkpointPath);
+ FileUtils.deleteDirectory(checkpointPath.toFile());
LOG.debug("MiniCluster has been tear down");
}
@@ -180,21 +179,4 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
public String toString() {
return "MiniCluster";
}
-
- /** Deletes the given path recursively. */
- public static void deletePath(Path path) throws IOException {
- final List<File> files =
- Files.walk(path)
- .filter(p -> p != path)
- .map(Path::toFile)
- .collect(Collectors.toList());
- for (File file : files) {
- if (file.isDirectory()) {
- deletePath(file.toPath());
- } else {
- file.delete();
- }
- }
- Files.deleteIfExists(path);
- }
}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
index 93d638c..25e638f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.testframe.external.sink;
import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
@@ -34,15 +33,6 @@ import java.util.List;
@Experimental
public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
- /**
- * Create an instance of {@link Sink} satisfying given options.
- *
- * @param sinkSettings settings of the sink
- * @throws UnsupportedOperationException if the provided option is not supported.
- */
- Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
- throws UnsupportedOperationException;
-
/** Create a reader for consuming data written to the external system by sink. */
ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
similarity index 53%
copy from flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
copy to flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
index 93d638c..da36a22 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
@@ -20,20 +20,14 @@ package org.apache.flink.connector.testframe.external.sink;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.connector.testframe.external.ExternalContext;
-import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
-
-import java.util.List;
/**
- * External context for DataStream sinks.
+ * External context for DataStream sinks whose version is V1.
*
* @param <T> Type of elements before serialization by sink
*/
@Experimental
-public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
-
+public interface DataStreamSinkV1ExternalContext<T> extends DataStreamSinkExternalContext<T> {
/**
* Create an instance of {@link Sink} satisfying given options.
*
@@ -42,23 +36,4 @@ public interface DataStreamSinkExternalContext<T> extends ExternalContext, Resul
*/
Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
throws UnsupportedOperationException;
-
- /** Create a reader for consuming data written to the external system by sink. */
- ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
-
- /**
- * Generate test data.
- *
- * <p>These test data will be sent to sink via a special source in Flink job, write to external
- * system by sink, consume back via {@link ExternalSystemDataReader}, and make comparison with
- * {@link T#equals(Object)} for validating correctness.
- *
- * <p>Make sure that the {@link T#equals(Object)} returns false when the records in different
- * splits.
- *
- * @param sinkSettings settings of the sink
- * @param seed Seed for generating random test data set.
- * @return List of generated test data.
- */
- List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
new file mode 100644
index 0000000..ce2cf07
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.external.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.sink2.Sink;
+
+/**
+ * External context for DataStream sinks whose version is V2.
+ *
+ * @param <T> Type of elements before serialization by sink
+ */
+@Experimental
+public interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
+ /**
+ * Create an instance of {@link Sink} satisfying given options.
+ *
+ * @param sinkSettings settings of the sink
+ * @throws UnsupportedOperationException if the provided option is not supported.
+ */
+ Sink<T> createSink(TestingSinkSettings sinkSettings) throws UnsupportedOperationException;
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
new file mode 100644
index 0000000..8f98e2c
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * A {@link Source} implementation that reads data from a list and stops reading at the fixed
+ * position. The source will wait until the checkpoint or savepoint triggered, the source is useful
+ * for connector tests.
+ *
+ * <p>Note: This parallelism of source must be 1.
+ */
+public class FromElementsSource<OUT> implements Source<OUT, FromElementsSplit, NoOpEnumState> {
+ private Boundedness boundedness;
+
+ private List<OUT> elements;
+
+ private Integer emittedElementsNum;
+
+ public FromElementsSource(List<OUT> elements) {
+ this.elements = elements;
+ }
+
+ public FromElementsSource(
+ Boundedness boundedness, List<OUT> elements, Integer emittedElementsNum) {
+ this(elements);
+ if (emittedElementsNum != null) {
+ Preconditions.checkState(
+ emittedElementsNum <= elements.size(),
+ String.format(
+ "The emittedElementsNum must not be larger than the elements list %d, but actual emittedElementsNum is %d",
+ elements.size(), emittedElementsNum));
+ }
+ this.boundedness = boundedness;
+ this.emittedElementsNum = emittedElementsNum;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness == null ? Boundedness.BOUNDED : boundedness;
+ }
+
+ @Override
+ public SourceReader<OUT, FromElementsSplit> createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return new FromElementsSourceReader<>(
+ emittedElementsNum, elements, boundedness, readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<FromElementsSplit, NoOpEnumState> createEnumerator(
+ SplitEnumeratorContext<FromElementsSplit> enumContext) throws Exception {
+ return new NoOpEnumerator();
+ }
+
+ @Override
+ public SplitEnumerator<FromElementsSplit, NoOpEnumState> restoreEnumerator(
+ SplitEnumeratorContext<FromElementsSplit> enumContext, NoOpEnumState checkpoint)
+ throws Exception {
+ return new NoOpEnumerator();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<FromElementsSplit> getSplitSerializer() {
+ return new FromElementsSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
+ return new NoOpEnumStateSerializer();
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
new file mode 100644
index 0000000..e00942a
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.Counter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
+
+/**
+ * A {@link SourceReader} implementation that reads data from a list. If limitedNum is set, the
+ * reader will stop reading at the limitedNum position until the checkpoint or savepoint triggered.
+ */
+public class FromElementsSourceReader<T> implements SourceReader<T, FromElementsSplit> {
+ private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class);
+
+ private volatile int emittedNum;
+ private volatile boolean isRunning = true;
+
+ /** The context of this source reader. */
+ private SourceReaderContext context;
+
+ private Integer limitedNum;
+ private Boundedness boundedness;
+ private volatile boolean checkpointAtLimitedNum = false;
+ private List<T> elements;
+ private Counter numRecordInCounter;
+
+ public FromElementsSourceReader(
+ Integer limitedNum,
+ List<T> elements,
+ Boundedness boundedness,
+ SourceReaderContext context) {
+ this.context = context;
+ this.emittedNum = 0;
+ this.elements = elements;
+ this.limitedNum = limitedNum;
+ this.boundedness = boundedness;
+ this.numRecordInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
+ if (isRunning && emittedNum < elements.size()) {
+ /*
+ * The reader will stop reading when it has emitted `successNum` records.
+ * If and only if a checkpoint whose `numElementsEmitted` is equal to `successNum`
+ * is completed, the reader will continue reading.
+ *
+ * When we disable the checkpointing and stop with a savepoint after
+ * receiving `successNum` records, the job starting with the savepoint
+ * will continue to read the records after the `successNum` records.
+ */
+ if (limitedNum == null
+ || (limitedNum != null
+ && (emittedNum < limitedNum || checkpointAtLimitedNum))) {
+ output.collect(elements.get(emittedNum));
+ emittedNum++;
+ numRecordInCounter.inc();
+ }
+ return MORE_AVAILABLE;
+ }
+
+ if (Boundedness.CONTINUOUS_UNBOUNDED.equals(boundedness)) {
+ return MORE_AVAILABLE;
+ } else {
+ return InputStatus.END_OF_INPUT;
+ }
+ }
+
+ @Override
+ public List<FromElementsSplit> snapshotState(long checkpointId) {
+ if (limitedNum != null && !checkpointAtLimitedNum && emittedNum == limitedNum) {
+ checkpointAtLimitedNum = true;
+ LOG.info("checkpoint {} is the target checkpoint to be used.", checkpointId);
+ }
+ return Arrays.asList(new FromElementsSplit(emittedNum));
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List<FromElementsSplit> splits) {
+ emittedNum = splits.get(0).getEmitNum();
+ LOG.info("FromElementsSourceReader restores from {}.", emittedNum);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void close() throws Exception {
+ isRunning = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ LOG.info("checkpoint {} finished.", checkpointId);
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
new file mode 100644
index 0000000..a2bb8cc
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+/** Mock enumerator state. */
+public class NoOpEnumState {}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
new file mode 100644
index 0000000..7be0e8d
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/** Mock enumerator state seializer. */
+public class NoOpEnumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(NoOpEnumState obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
+ return new NoOpEnumState();
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
new file mode 100644
index 0000000..b23bfa7
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Mock enumerator. */
+public class NoOpEnumerator implements SplitEnumerator<FromElementsSplit, NoOpEnumState> {
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}
+
+ @Override
+ public void addSplitsBack(List<FromElementsSplit> splits, int subtaskId) {}
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+ return new NoOpEnumState();
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
new file mode 100644
index 0000000..d5ae8ad
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+
+/** The split of the {@link FromElementsSource}. */
+public class FromElementsSplit implements SourceSplit {
+ public static final String SPLIT_ID = "fakeSplitId";
+
+ private int emitNum;
+
+ public FromElementsSplit(int emitNum) {
+ this.emitNum = emitNum;
+ }
+
+ public int getEmitNum() {
+ return emitNum;
+ }
+
+ public void setEmitNum(int emitNum) {
+ this.emitNum = emitNum;
+ }
+
+ @Override
+ public String splitId() {
+ return SPLIT_ID;
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
new file mode 100644
index 0000000..ab32917
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** The split serializer for the {@link FromElementsSource}. */
+public class FromElementsSplitSerializer implements SimpleVersionedSerializer<FromElementsSplit> {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(FromElementsSplit split) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeInt(split.getEmitNum());
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public FromElementsSplit deserialize(int version, byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ int emitNum = in.readInt();
+ return new FromElementsSplit(emitNum);
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
new file mode 100644
index 0000000..2c13e83
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connector.testframe.utils.MetricQuerier;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.commons.math3.util.Precision;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ * <li>What's the purpose of this case
+ * <li>Simple description of how this case works
+ * <li>Condition to fulfill in order to pass this case
+ * <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+ ConnectorTestingExtension.class,
+ TestLoggerExtension.class,
+ TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+
+ // ----------------------------- Basic test cases ---------------------------------
+
+ /**
+ * Test DataStream connector sink.
+ *
+ * <p>The following tests will create a sink in the external system, generate a collection of
+ * test data and write them to this sink by the Flink Job.
+ *
+ * <p>In order to pass these tests, the number of records produced by Flink need to be equals to
+ * the generated test data. And the records in the sink will be compared to the test data by the
+ * different semantics. There's no requirement for records order.
+ */
+ @TestTemplate
+ @DisplayName("Test data stream sink")
+ public void testBasicSink(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+ final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+ // Build and execute Flink job
+ StreamExecutionEnvironment execEnv =
+ testEnv.createExecutionEnvironment(
+ TestEnvironmentSettings.builder()
+ .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+ .build());
+ execEnv.enableCheckpointing(50);
+ DataStream<T> dataStream =
+ execEnv.fromCollection(testRecords)
+ .name("sourceInSinkTest")
+ .setParallelism(1)
+ .returns(externalContext.getProducedType());
+ tryCreateSink(dataStream, externalContext, sinkSettings)
+ .setParallelism(1)
+ .name("sinkInSinkTest");
+ final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(JobStatus.FINISHED),
+ Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+ // Check test result
+ checkResultWithSemantic(
+ externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+ }
+
+ /**
+ * Test connector sink restart from a completed savepoint with the same parallelism.
+ *
+ * <p>This test will create a sink in the external system, generate a collection of test data
+ * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+ * stop the job, restart the same job from the completed savepoint. After the job has been
+ * running, write the other part to the sink and compare the result.
+ *
+ * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+ * the generated test data. And the records in the sink will be compared to the test data by the
+ * different semantic. There's no requirement for record order.
+ */
+ @TestTemplate
+ @DisplayName("Test sink restarting from a savepoint")
+ public void testStartFromSavepoint(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+ }
+
+ /**
+ * Test connector sink restart from a completed savepoint with a higher parallelism.
+ *
+ * <p>This test will create a sink in the external system, generate a collection of test data
+ * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+ * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+ * After the job has been running, write the other part to the sink and compare the result.
+ *
+ * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+ * the generated test data. And the records in the sink will be compared to the test data by the
+ * different semantic. There's no requirement for record order.
+ */
+ @TestTemplate
+ @DisplayName("Test sink restarting with a higher parallelism")
+ public void testScaleUp(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+ }
+
+ /**
+ * Test connector sink restart from a completed savepoint with a lower parallelism.
+ *
+ * <p>This test will create a sink in the external system, generate a collection of test data
+ * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+ * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+ * After the job has been running, write the other part to the sink and compare the result.
+ *
+ * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+ * the generated test data. And the records in the sink will be compared to the test data by the
+ * different semantic. There's no requirement for record order.
+ */
+ @TestTemplate
+ @DisplayName("Test sink restarting with a lower parallelism")
+ public void testScaleDown(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+ }
+
+ private void restartFromSavepoint(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic,
+ final int beforeParallelism,
+ final int afterParallelism)
+ throws Exception {
+ // Step 1: Preparation
+ TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+ final StreamExecutionEnvironment execEnv =
+ testEnv.createExecutionEnvironment(
+ TestEnvironmentSettings.builder()
+ .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+ .build());
+ execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+ // Step 2: Generate test data
+ final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+ // Step 3: Build and execute Flink job
+ int numBeforeSuccess = testRecords.size() / 2;
+ DataStreamSource<T> source =
+ execEnv.fromSource(
+ new FromElementsSource<>(
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ testRecords,
+ numBeforeSuccess),
+ WatermarkStrategy.noWatermarks(),
+ "beforeRestartSource")
+ .setParallelism(1);
+
+ DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+ tryCreateSink(dataStream, externalContext, sinkSettings)
+ .name("Sink restart test")
+ .setParallelism(beforeParallelism);
+
+ /**
+ * The job should stop after consume a specified number of records. In order to know when
+ * the specified number of records have been consumed, a collect sink is need to be watched.
+ */
+ CollectResultIterator<T> iterator = addCollectSink(source);
+ final JobClient jobClient = execEnv.executeAsync("Restart Test");
+ iterator.setJobClient(jobClient);
+
+ // Step 4: Wait for the expected result and stop Flink job with a savepoint
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ String savepointPath;
+ try {
+ waitForAllTaskRunning(
+ () ->
+ getJobDetails(
+ new RestClient(new Configuration(), executorService),
+ testEnv.getRestEndpoint(),
+ jobClient.getJobID()),
+ Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+ waitExpectedSizeData(iterator, numBeforeSuccess);
+
+ savepointPath =
+ jobClient
+ .stopWithSavepoint(
+ true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL)
+ .get(30, TimeUnit.SECONDS);
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(JobStatus.FINISHED),
+ Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+ } catch (Exception e) {
+ executorService.shutdown();
+ killJob(jobClient);
+ throw e;
+ }
+
+ List<T> target = testRecords.subList(0, numBeforeSuccess);
+ checkResultWithSemantic(
+ externalContext.createSinkDataReader(sinkSettings), target, semantic);
+
+ // Step 4: restart the Flink job with the savepoint
+ final StreamExecutionEnvironment restartEnv =
+ testEnv.createExecutionEnvironment(
+ TestEnvironmentSettings.builder()
+ .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+ .setSavepointRestorePath(savepointPath)
+ .build());
+ restartEnv.enableCheckpointing(50);
+
+ DataStreamSource<T> restartSource =
+ restartEnv
+ .fromSource(
+ new FromElementsSource<>(
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ testRecords,
+ testRecords.size()),
+ WatermarkStrategy.noWatermarks(),
+ "restartSource")
+ .setParallelism(1);
+
+ DataStream<T> sinkStream = restartSource.returns(externalContext.getProducedType());
+ tryCreateSink(sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism);
+ addCollectSink(restartSource);
+ final JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
+
+ try {
+ // Check the result
+ checkResultWithSemantic(
+ externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+ } finally {
+ executorService.shutdown();
+ killJob(restartJobClient);
+ iterator.close();
+ }
+ }
+
+ /**
+ * Test connector sink metrics.
+ *
+ * <p>This test will create a sink in the external system, generate test data and write them to
+ * the sink via a Flink job. Then read and compare the metrics.
+ *
+ * <p>Now test: numRecordsOut
+ */
+ @TestTemplate
+ @DisplayName("Test sink metrics")
+ public void testMetrics(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> externalContext,
+ CheckpointingMode semantic)
+ throws Exception {
+ TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+ int parallelism = 1;
+ final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+ // make sure use different names when executes multi times
+ String sinkName = "metricTestSink" + testRecords.hashCode();
+ final StreamExecutionEnvironment env =
+ testEnv.createExecutionEnvironment(
+ TestEnvironmentSettings.builder()
+ .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+ .build());
+ env.enableCheckpointing(50);
+
+ DataStreamSource<T> source =
+ env.fromSource(
+ new FromElementsSource<>(
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ testRecords,
+ testRecords.size()),
+ WatermarkStrategy.noWatermarks(),
+ "metricTestSource")
+ .setParallelism(1);
+
+ DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+ tryCreateSink(dataStream, externalContext, sinkSettings)
+ .name(sinkName)
+ .setParallelism(parallelism);
+ final JobClient jobClient = env.executeAsync("Metrics Test");
+ final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ waitForAllTaskRunning(
+ () ->
+ getJobDetails(
+ new RestClient(new Configuration(), executorService),
+ testEnv.getRestEndpoint(),
+ jobClient.getJobID()),
+ Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+ waitUntilCondition(
+ () -> {
+ // test metrics
+ try {
+ return compareSinkMetrics(
+ queryRestClient,
+ testEnv,
+ externalContext,
+ jobClient.getJobID(),
+ sinkName,
+ testRecords.size());
+ } catch (Exception e) {
+ // skip failed assert try
+ return false;
+ }
+ },
+ Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+ } finally {
+ // Clean up
+ executorService.shutdown();
+ killJob(jobClient);
+ }
+ }
+
+ // ----------------------------- Helper Functions ---------------------------------
+
+ /**
+ * Generate a set of test records.
+ *
+ * @param testingSinkSettings sink settings
+ * @param externalContext External context
+ * @return Collection of generated test records
+ */
+ protected List<T> generateTestData(
+ TestingSinkSettings testingSinkSettings,
+ DataStreamSinkExternalContext<T> externalContext) {
+ return externalContext.generateTestData(
+ testingSinkSettings, ThreadLocalRandom.current().nextLong());
+ }
+
+ /**
+ * Poll records from the sink.
+ *
+ * @param result Append records to which list
+ * @param reader The sink reader
+ * @param expected The expected list which help to stop polling
+ * @param retryTimes The retry times
+ * @param semantic The semantic
+ * @return Collection of records in the Sink
+ */
+ private List<T> pollAndAppendResultData(
+ List<T> result,
+ ExternalSystemDataReader<T> reader,
+ List<T> expected,
+ int retryTimes,
+ CheckpointingMode semantic) {
+ long timeoutMs = 1000L;
+ int retryIndex = 0;
+
+ while (retryIndex++ < retryTimes
+ && !checkGetEnoughRecordsWithSemantic(expected, result, semantic)) {
+ result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+ }
+ return result;
+ }
+
+ /**
+ * Check whether the polling should stop.
+ *
+ * @param expected The expected list which help to stop polling
+ * @param result The records that have been read
+ * @param semantic The semantic
+ * @return Whether the polling should stop
+ */
+ private boolean checkGetEnoughRecordsWithSemantic(
+ List<T> expected, List<T> result, CheckpointingMode semantic) {
+ checkNotNull(expected);
+ checkNotNull(result);
+ if (EXACTLY_ONCE.equals(semantic)) {
+ return expected.size() <= result.size();
+ } else if (AT_LEAST_ONCE.equals(semantic)) {
+ Set<Integer> matchedIndex = new HashSet<>();
+ for (T record : expected) {
+ int before = matchedIndex.size();
+ for (int i = 0; i < result.size(); i++) {
+ if (matchedIndex.contains(i)) {
+ continue;
+ }
+ if (record.equals(result.get(i))) {
+ matchedIndex.add(i);
+ break;
+ }
+ }
+ // if not find the record in the result
+ if (before == matchedIndex.size()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ throw new IllegalStateException(
+ String.format("%s delivery guarantee doesn't support test.", semantic.name()));
+ }
+
+ /**
+ * Compare the test data with actual data in given semantic.
+ *
+ * @param reader the data reader for the sink
+ * @param testData the test data
+ * @param semantic the supported semantic, see {@link CheckpointingMode}
+ */
+ private void checkResultWithSemantic(
+ ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+ throws Exception {
+ final ArrayList<T> result = new ArrayList<>();
+ waitUntilCondition(
+ () -> {
+ pollAndAppendResultData(result, reader, testData, 30, semantic);
+ try {
+ CollectIteratorAssertions.assertThat(sort(result).iterator())
+ .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ },
+ Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+ }
+
+ /** Compare the metrics. */
+ private boolean compareSinkMetrics(
+ MetricQuerier metricQuerier,
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<T> context,
+ JobID jobId,
+ String sinkName,
+ long allRecordSize)
+ throws Exception {
+ double sumNumRecordsOut =
+ metricQuerier.getAggregatedMetricsByRestAPI(
+ testEnv.getRestEndpoint(),
+ jobId,
+ sinkName,
+ MetricNames.IO_NUM_RECORDS_OUT,
+ getSinkMetricFilter(context));
+ return Precision.equals(allRecordSize, sumNumRecordsOut);
+ }
+
+ /** Sort the list. */
+ private List<T> sort(List<T> list) {
+ return list.stream().sorted().collect(Collectors.toList());
+ }
+
+ private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
+ return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
+ }
+
+ private void killJob(JobClient jobClient) throws Exception {
+ terminateJob(jobClient);
+ waitForJobStatus(
+ jobClient,
+ Collections.singletonList(JobStatus.CANCELED),
+ Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+ }
+
+ private DataStreamSink<T> tryCreateSink(
+ DataStream<T> dataStream,
+ DataStreamSinkExternalContext<T> context,
+ TestingSinkSettings sinkSettings) {
+ try {
+ if (context instanceof DataStreamSinkV1ExternalContext) {
+ org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sinkV1 =
+ ((DataStreamSinkV1ExternalContext<T>) context).createSink(sinkSettings);
+ return dataStream.sinkTo(sinkV1);
+ } else if (context instanceof DataStreamSinkV2ExternalContext) {
+ Sink<T> sinkV2 =
+ ((DataStreamSinkV2ExternalContext<T>) context).createSink(sinkSettings);
+ return dataStream.sinkTo(sinkV2);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.",
+ context.getClass()));
+ }
+ } catch (UnsupportedOperationException e) {
+ // abort the test
+ throw new TestAbortedException("Cannot create a sink satisfying given options.", e);
+ }
+ }
+
+ /**
+ * Return the filter used to filter the sink metric.
+ *
+ * <ul>
+ * <li>Sink v1: return null.
+ * <li>Sink v2: return the "Writer" prefix in the `SinkTransformationTranslator`.
+ * </ul>
+ */
+ private String getSinkMetricFilter(DataStreamSinkExternalContext<T> context) {
+ if (context instanceof DataStreamSinkV1ExternalContext) {
+ return null;
+ } else if (context instanceof DataStreamSinkV2ExternalContext) {
+ // See class `SinkTransformationTranslator`
+ return "Writer";
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Get unexpected sink context: %s", context.getClass()));
+ }
+ }
+
+ protected CollectResultIterator<T> addCollectSink(DataStream<T> stream) {
+ TypeSerializer<T> serializer =
+ stream.getType().createSerializer(stream.getExecutionConfig());
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectSinkOperatorFactory<T> factory =
+ new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+ CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
+ CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
+ sink.name("Data stream collect sink");
+ stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+ return new CollectResultIterator<>(
+ operator.getOperatorIdFuture(),
+ serializer,
+ accumulatorName,
+ stream.getExecutionEnvironment().getCheckpointConfig());
+ }
+
+ private void waitExpectedSizeData(CollectResultIterator<T> iterator, int targetNum) {
+ assertThat(
+ CompletableFuture.supplyAsync(
+ () -> {
+ int count = 0;
+ while (count < targetNum && iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+ if (count < targetNum) {
+ throw new IllegalStateException(
+ String.format(
+ "Fail to get %d records.", targetNum));
+ }
+ return true;
+ }))
+ .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 992e12a..a90d05b 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.connector.testframe.utils.MetricQuerier;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -70,11 +71,14 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
@@ -447,11 +451,14 @@ public abstract class SourceTestSuiteBase<T> {
final JobClient jobClient = env.executeAsync("Metrics Test");
final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+ final ExecutorService executorService = Executors.newCachedThreadPool();
try {
waitForAllTaskRunning(
() ->
- queryRestClient.getJobDetails(
- testEnv.getRestEndpoint(), jobClient.getJobID()),
+ getJobDetails(
+ new RestClient(new Configuration(), executorService),
+ testEnv.getRestEndpoint(),
+ jobClient.getJobID()),
Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
waitUntilCondition(
@@ -472,6 +479,7 @@ public abstract class SourceTestSuiteBase<T> {
Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
} finally {
// Clean up
+ executorService.shutdown();
killJob(jobClient);
}
}
@@ -775,7 +783,8 @@ public abstract class SourceTestSuiteBase<T> {
testEnv.getRestEndpoint(),
jobId,
sourceName,
- MetricNames.IO_NUM_RECORDS_IN);
+ MetricNames.IO_NUM_RECORDS_IN,
+ null);
return Precision.equals(allRecordSize, sumNumRecordsIn);
}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
index 9fe13c1..d749132 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
@@ -40,6 +41,9 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -56,16 +60,15 @@ public class MetricQuerier {
restClient = new RestClient(configuration, Executors.newCachedThreadPool());
}
- public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, JobID jobId)
- throws Exception {
+ public static JobDetailsInfo getJobDetails(
+ RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception {
String jmAddress = endpoint.getAddress();
int jmPort = endpoint.getPort();
final JobMessageParameters params = new JobMessageParameters();
params.jobPathParameter.resolve(jobId);
- return restClient
- .sendRequest(
+ return client.sendRequest(
jmAddress,
jmPort,
JobDetailsHeaders.getInstance(),
@@ -120,10 +123,11 @@ public class MetricQuerier {
TestEnvironment.Endpoint endpoint,
JobID jobId,
String sourceOrSinkName,
- String metricName)
+ String metricName,
+ String filter)
throws Exception {
// get job details, including the vertex id
- JobDetailsInfo jobDetailsInfo = getJobDetails(endpoint, jobId);
+ JobDetailsInfo jobDetailsInfo = getJobDetails(restClient, endpoint, jobId);
// get the vertex id for source/sink operator
JobDetailsInfo.JobVertexDetailsInfo vertex =
@@ -143,8 +147,8 @@ public class MetricQuerier {
metricsResponseBody.getMetrics().stream()
.filter(
m ->
- m.getId().endsWith(metricName)
- && m.getId().contains(sourceOrSinkName))
+ filterByMetricName(
+ m.getId(), sourceOrSinkName, metricName, filter))
.map(m -> m.getId())
.collect(Collectors.joining(","));
@@ -157,6 +161,27 @@ public class MetricQuerier {
AggregatedMetricsResponseBody metricsResponse =
getMetrics(endpoint, jobId, vertexId, queryParam);
- return metricsResponse.getMetrics().iterator().next().getSum();
+
+ Collection<AggregatedMetric> metrics = metricsResponse.getMetrics();
+ if (metrics == null || metrics.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot find metric[%s] for operator [%s] with filter [%s].",
+ metricName, sourceOrSinkName, filter));
+ }
+ return metrics.iterator().next().getSum();
+ }
+
+ private boolean filterByMetricName(
+ String metricName,
+ String sourceOrSinkName,
+ String targetMetricName,
+ @Nullable String filter) {
+ boolean filterByName =
+ metricName.endsWith(targetMetricName) && metricName.contains(sourceOrSinkName);
+ if (!StringUtils.isNullOrWhitespaceOnly(filter)) {
+ return filterByName && metricName.contains(filter);
+ }
+ return filterByName;
}
}