You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/02/16 01:31:56 UTC

[flink] 01/02: [FLINK-25289][tests] Introduce sink test suite in connector test framework

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57e3f03ccd719ed772c983ba335517d95f8f3e6a
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Tue Feb 15 20:15:21 2022 +0800

    [FLINK-25289][tests] Introduce sink test suite in connector test framework
    
    This closes #18496.
---
 .../connector/kafka/sink/KafkaSinkITCase.java      |  46 ++
 .../kafka/sink/testutils/KafkaDataReader.java      |  69 +++
 .../sink/testutils/KafkaSinkExternalContext.java   | 272 +++++++++
 .../testutils/KafkaSinkExternalContextFactory.java |  53 ++
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  10 +-
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   |  87 +++
 .../flink-connector-test-utils/pom.xml             |  26 +
 .../environment/MiniClusterTestEnvironment.java    |  22 +-
 .../sink/DataStreamSinkExternalContext.java        |  10 -
 ...t.java => DataStreamSinkV1ExternalContext.java} |  29 +-
 .../sink/DataStreamSinkV2ExternalContext.java      |  38 ++
 .../testframe/source/FromElementsSource.java       | 103 ++++
 .../testframe/source/FromElementsSourceReader.java | 134 +++++
 .../testframe/source/enumerator/NoOpEnumState.java |  22 +
 .../source/enumerator/NoOpEnumStateSerializer.java |  41 ++
 .../source/enumerator/NoOpEnumerator.java          |  50 ++
 .../testframe/source/split/FromElementsSplit.java  |  46 ++
 .../source/split/FromElementsSplitSerializer.java  |  55 ++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 643 +++++++++++++++++++++
 .../testframe/testsuites/SourceTestSuiteBase.java  |  15 +-
 .../connector/testframe/utils/MetricQuerier.java   |  43 +-
 21 files changed, 1744 insertions(+), 70 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 2a35112..4234f7a 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -31,12 +31,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -48,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
@@ -69,11 +81,15 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
 
 import javax.annotation.Nullable;
 
@@ -161,6 +177,36 @@ public class KafkaSinkITCase extends TestLogger {
         deleteTestTopic(topic);
     }
 
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTests extends SinkTestSuiteBase<String> {
+        // Defines test environment on Flink MiniCluster
+        @SuppressWarnings("unused")
+        @TestEnv
+        MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        // Defines external system
+        @TestExternalSystem
+        DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+                DefaultContainerizedExternalSystem.builder()
+                        .fromContainer(
+                                new KafkaContainer(
+                                        DockerImageName.parse(DockerImageVersions.KAFKA)))
+                        .build();
+
+        @SuppressWarnings("unused")
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @SuppressWarnings("unused")
+        @TestContext
+        KafkaSinkExternalContextFactory sinkContext =
+                new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
+    }
+
     @Test
     public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception {
         writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
new file mode 100644
index 0000000..0847cd4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/** Kafka dataStream data reader. */
+public class KafkaDataReader implements ExternalSystemDataReader<String> {
+    private final KafkaConsumer<String, String> consumer;
+
+    public KafkaDataReader(Properties properties, Collection<TopicPartition> partitions) {
+        this.consumer = new KafkaConsumer<>(properties);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+    }
+
+    @Override
+    public List<String> poll(Duration timeout) {
+        List<String> result = new LinkedList<>();
+        ConsumerRecords<String, String> consumerRecords;
+        try {
+            consumerRecords = consumer.poll(timeout);
+        } catch (WakeupException we) {
+            return Collections.emptyList();
+        }
+        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
+        while (iterator.hasNext()) {
+            result.add(iterator.next().value());
+        }
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (consumer != null) {
+            consumer.close();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
new file mode 100644
index 0000000..7c287d2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+
+/** A Kafka external context that will create only one topic and use partitions in that topic. */
+public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
+
+    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
+    private static final long DEFAULT_TIMEOUT = 30L;
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000;
+
+    protected String bootstrapServers;
+    protected final String topicName;
+
+    private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>();
+
+    protected int numSplits = 0;
+
+    private List<URL> connectorJarPaths;
+
+    protected final AdminClient kafkaAdminClient;
+
+    public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) {
+        this.bootstrapServers = bootstrapServers;
+        this.connectorJarPaths = connectorJarPaths;
+        this.topicName =
+                TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        kafkaAdminClient = createAdminClient();
+    }
+
+    private void createTopic(String topicName, int numPartitions, short replicationFactor) {
+        LOG.debug(
+                "Creating new Kafka topic {} with {} partitions and {} replicas",
+                topicName,
+                numPartitions,
+                replicationFactor);
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
+        try {
+            kafkaAdminClient
+                    .createTopics(Collections.singletonList(newTopic))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
+        }
+    }
+
+    private void deleteTopic(String topicName) {
+        LOG.debug("Deleting Kafka topic {}", topicName);
+        try {
+            kafkaAdminClient
+                    .deleteTopics(Collections.singletonList(topicName))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
+                throw new RuntimeException(
+                        String.format("Cannot delete unknown Kafka topic '%s'", topicName), e);
+            }
+        }
+    }
+
+    private AdminClient createAdminClient() {
+        final Properties config = new Properties();
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        return AdminClient.create(config);
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        if (!topicExists(topicName)) {
+            createTopic(topicName, 4, (short) 1);
+        }
+
+        KafkaSinkBuilder<String> builder = KafkaSink.builder();
+        final Properties properties = new Properties();
+        properties.put(
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT_IN_MS);
+        builder.setBootstrapServers(bootstrapServers)
+                .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode()))
+                .setTransactionalIdPrefix("testingFramework")
+                .setKafkaProducerConfig(properties)
+                .setRecordSerializer(
+                        KafkaRecordSerializationSchema.builder()
+                                .setTopic(topicName)
+                                .setValueSerializationSchema(new SimpleStringSchema())
+                                .build());
+        return builder.build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        LOG.info("Fetching information for topic: {}", topicName);
+        final Map<String, TopicDescription> topicMetadata =
+                getTopicMetadata(Arrays.asList(topicName));
+
+        Set<TopicPartition> subscribedPartitions = new HashSet<>();
+        for (TopicDescription topic : topicMetadata.values()) {
+            for (TopicPartitionInfo partition : topic.partitions()) {
+                subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
+            }
+        }
+
+        Properties properties = new Properties();
+        properties.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "flink-kafka-test" + subscribedPartitions.hashCode());
+        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        properties.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        if (EXACTLY_ONCE.equals(sinkSettings.getCheckpointingMode())) {
+            // default is read_uncommitted
+            properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        readers.add(new KafkaDataReader(properties, subscribedPartitions));
+        return readers.get(readers.size() - 1);
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<String> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+        for (int i = 0; i < recordNum; i++) {
+            int stringLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            randomStringRecords.add(RandomStringUtils.random(stringLength, true, true));
+        }
+        return randomStringRecords;
+    }
+
+    protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
+        try {
+            return kafkaAdminClient.describeTopics(topics).all().get();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Failed to get metadata for topics %s.", topics), e);
+        }
+    }
+
+    private boolean topicExists(String topic) {
+        try {
+            kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (numSplits != 0) {
+            deleteTopic(topicName);
+        }
+        readers.stream()
+                .filter(Objects::nonNull)
+                .forEach(
+                        reader -> {
+                            try {
+                                reader.close();
+                            } catch (Exception e) {
+                                if (kafkaAdminClient != null) {
+                                    kafkaAdminClient.close();
+                                }
+                                throw new RuntimeException("Cannot close split writer", e);
+                            }
+                        });
+        readers.clear();
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Single-topic Kafka";
+    }
+
+    @Override
+    public List<URL> getConnectorJarPaths() {
+        return connectorJarPaths;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(String.class);
+    }
+
+    private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
+        switch (checkpointingMode) {
+            case EXACTLY_ONCE:
+                return DeliveryGuarantee.EXACTLY_ONCE;
+            case AT_LEAST_ONCE:
+                return DeliveryGuarantee.AT_LEAST_ONCE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.",
+                                checkpointingMode));
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
new file mode 100644
index 0000000..b795854
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.KafkaContainer;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Kafka sink external context factory. */
+public class KafkaSinkExternalContextFactory
+        implements ExternalContextFactory<KafkaSinkExternalContext> {
+
+    private final KafkaContainer kafkaContainer;
+    private final List<URL> connectorJars;
+
+    public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, List<URL> connectorJars) {
+        this.kafkaContainer = kafkaContainer;
+        this.connectorJars = connectorJars;
+    }
+
+    private String getBootstrapServer() {
+        final String internalEndpoints =
+                kafkaContainer.getNetworkAliases().stream()
+                        .map(host -> String.join(":", host, Integer.toString(9092)))
+                        .collect(Collectors.joining(","));
+        return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints);
+    }
+
+    @Override
+    public KafkaSinkExternalContext createExternalContext(String testName) {
+        return new KafkaSinkExternalContext(getBootstrapServer(), connectorJars);
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index d37d55d..ec7bd0c 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -150,7 +150,6 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka</artifactId>
@@ -225,6 +224,15 @@ under the License.
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
 						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${project.version}</version>
+							<classifier>source</classifier>
+							<destFileName>flink-connector-testing.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
 							<groupId>org.apache.kafka</groupId>
 							<artifactId>kafka-clients</artifactId>
 							<version>2.8.1</version>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
new file mode 100644
index 0000000..520491e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** Kafka sink E2E test based on connector testing framework. */
+@SuppressWarnings("unused")
+public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
+    private static final String KAFKA_HOSTNAME = "kafka";
+
+    @TestSemantics
+    CheckpointingMode[] semantics =
+            new CheckpointingMode[] {
+                CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+            };
+
+    // Defines TestEnvironment
+    @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                                    .withNetworkAliases(KAFKA_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    // Defines 2 External context Factories, so test cases will be invoked twice using these two
+    // kinds of external contexts.
+    @TestContext
+    KafkaSinkExternalContextFactory contextFactory =
+            new KafkaSinkExternalContextFactory(
+                    kafka.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("kafka-connector.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("kafka-clients.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+
+    public KafkaSinkE2ECase() throws Exception {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/pom.xml b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
index 13aeea7..b3c1067 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
@@ -95,4 +95,30 @@
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<shadedClassifierName>source</shadedClassifierName>
+							<artifactSet>
+								<includes>
+									<include>**/connector/testframe/source/**</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
 </project>
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
index c5f29d1..0ecbe7f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
@@ -30,17 +30,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -161,7 +160,7 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
         }
         isStarted = false;
         this.miniCluster.after();
-        deletePath(checkpointPath);
+        FileUtils.deleteDirectory(checkpointPath.toFile());
         LOG.debug("MiniCluster has been tear down");
     }
 
@@ -180,21 +179,4 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
     public String toString() {
         return "MiniCluster";
     }
-
-    /** Deletes the given path recursively. */
-    public static void deletePath(Path path) throws IOException {
-        final List<File> files =
-                Files.walk(path)
-                        .filter(p -> p != path)
-                        .map(Path::toFile)
-                        .collect(Collectors.toList());
-        for (File file : files) {
-            if (file.isDirectory()) {
-                deletePath(file.toPath());
-            } else {
-                file.delete();
-            }
-        }
-        Files.deleteIfExists(path);
-    }
 }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
index 93d638c..25e638f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.testframe.external.sink;
 
 import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.connector.testframe.external.ExternalContext;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
@@ -34,15 +33,6 @@ import java.util.List;
 @Experimental
 public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
 
-    /**
-     * Create an instance of {@link Sink} satisfying given options.
-     *
-     * @param sinkSettings settings of the sink
-     * @throws UnsupportedOperationException if the provided option is not supported.
-     */
-    Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
-            throws UnsupportedOperationException;
-
     /** Create a reader for consuming data written to the external system by sink. */
     ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
 
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
similarity index 53%
copy from flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
copy to flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
index 93d638c..da36a22 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
@@ -20,20 +20,14 @@ package org.apache.flink.connector.testframe.external.sink;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.connector.testframe.external.ExternalContext;
-import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
-
-import java.util.List;
 
 /**
- * External context for DataStream sinks.
+ * External context for DataStream sinks whose version is V1.
  *
  * @param <T> Type of elements before serialization by sink
  */
 @Experimental
-public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
-
+public interface DataStreamSinkV1ExternalContext<T> extends DataStreamSinkExternalContext<T> {
     /**
      * Create an instance of {@link Sink} satisfying given options.
      *
@@ -42,23 +36,4 @@ public interface DataStreamSinkExternalContext<T> extends ExternalContext, Resul
      */
     Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
             throws UnsupportedOperationException;
-
-    /** Create a reader for consuming data written to the external system by sink. */
-    ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
-
-    /**
-     * Generate test data.
-     *
-     * <p>These test data will be sent to sink via a special source in Flink job, write to external
-     * system by sink, consume back via {@link ExternalSystemDataReader}, and make comparison with
-     * {@link T#equals(Object)} for validating correctness.
-     *
-     * <p>Make sure that the {@link T#equals(Object)} returns false when the records in different
-     * splits.
-     *
-     * @param sinkSettings settings of the sink
-     * @param seed Seed for generating random test data set.
-     * @return List of generated test data.
-     */
-    List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
 }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
new file mode 100644
index 0000000..ce2cf07
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.external.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.sink2.Sink;
+
+/**
+ * External context for DataStream sinks whose version is V2.
+ *
+ * @param <T> Type of elements before serialization by sink
+ */
+@Experimental
+public interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
+    /**
+     * Create an instance of {@link Sink} satisfying given options.
+     *
+     * @param sinkSettings settings of the sink
+     * @throws UnsupportedOperationException if the provided option is not supported.
+     */
+    Sink<T> createSink(TestingSinkSettings sinkSettings) throws UnsupportedOperationException;
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
new file mode 100644
index 0000000..8f98e2c
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * A {@link Source} implementation that reads data from a list and stops reading at the fixed
+ * position. The source will wait until the checkpoint or savepoint triggered, the source is useful
+ * for connector tests.
+ *
+ * <p>Note: This parallelism of source must be 1.
+ */
+public class FromElementsSource<OUT> implements Source<OUT, FromElementsSplit, NoOpEnumState> {
+    private Boundedness boundedness;
+
+    private List<OUT> elements;
+
+    private Integer emittedElementsNum;
+
+    public FromElementsSource(List<OUT> elements) {
+        this.elements = elements;
+    }
+
+    public FromElementsSource(
+            Boundedness boundedness, List<OUT> elements, Integer emittedElementsNum) {
+        this(elements);
+        if (emittedElementsNum != null) {
+            Preconditions.checkState(
+                    emittedElementsNum <= elements.size(),
+                    String.format(
+                            "The emittedElementsNum must not be larger than the elements list %d, but actual emittedElementsNum is %d",
+                            elements.size(), emittedElementsNum));
+        }
+        this.boundedness = boundedness;
+        this.emittedElementsNum = emittedElementsNum;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness == null ? Boundedness.BOUNDED : boundedness;
+    }
+
+    @Override
+    public SourceReader<OUT, FromElementsSplit> createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return new FromElementsSourceReader<>(
+                emittedElementsNum, elements, boundedness, readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<FromElementsSplit, NoOpEnumState> createEnumerator(
+            SplitEnumeratorContext<FromElementsSplit> enumContext) throws Exception {
+        return new NoOpEnumerator();
+    }
+
+    @Override
+    public SplitEnumerator<FromElementsSplit, NoOpEnumState> restoreEnumerator(
+            SplitEnumeratorContext<FromElementsSplit> enumContext, NoOpEnumState checkpoint)
+            throws Exception {
+        return new NoOpEnumerator();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<FromElementsSplit> getSplitSerializer() {
+        return new FromElementsSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
+        return new NoOpEnumStateSerializer();
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
new file mode 100644
index 0000000..e00942a
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.Counter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
+
+/**
+ * A {@link SourceReader} implementation that reads data from a list. If limitedNum is set, the
+ * reader will stop reading at the limitedNum position until the checkpoint or savepoint triggered.
+ */
+public class FromElementsSourceReader<T> implements SourceReader<T, FromElementsSplit> {
+    private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class);
+
+    private volatile int emittedNum;
+    private volatile boolean isRunning = true;
+
+    /** The context of this source reader. */
+    private SourceReaderContext context;
+
+    private Integer limitedNum;
+    private Boundedness boundedness;
+    private volatile boolean checkpointAtLimitedNum = false;
+    private List<T> elements;
+    private Counter numRecordInCounter;
+
+    public FromElementsSourceReader(
+            Integer limitedNum,
+            List<T> elements,
+            Boundedness boundedness,
+            SourceReaderContext context) {
+        this.context = context;
+        this.emittedNum = 0;
+        this.elements = elements;
+        this.limitedNum = limitedNum;
+        this.boundedness = boundedness;
+        this.numRecordInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
+        if (isRunning && emittedNum < elements.size()) {
+            /*
+             * The reader will stop reading when it has emitted `successNum` records.
+             * If and only if a checkpoint whose `numElementsEmitted` is equal to `successNum`
+             * is completed, the reader will continue reading.
+             *
+             * When we disable the checkpointing and stop with a savepoint after
+             * receiving `successNum` records, the job starting with the savepoint
+             * will continue to read the records after the `successNum` records.
+             */
+            if (limitedNum == null
+                    || (limitedNum != null
+                            && (emittedNum < limitedNum || checkpointAtLimitedNum))) {
+                output.collect(elements.get(emittedNum));
+                emittedNum++;
+                numRecordInCounter.inc();
+            }
+            return MORE_AVAILABLE;
+        }
+
+        if (Boundedness.CONTINUOUS_UNBOUNDED.equals(boundedness)) {
+            return MORE_AVAILABLE;
+        } else {
+            return InputStatus.END_OF_INPUT;
+        }
+    }
+
+    @Override
+    public List<FromElementsSplit> snapshotState(long checkpointId) {
+        if (limitedNum != null && !checkpointAtLimitedNum && emittedNum == limitedNum) {
+            checkpointAtLimitedNum = true;
+            LOG.info("checkpoint {} is the target checkpoint to be used.", checkpointId);
+        }
+        return Arrays.asList(new FromElementsSplit(emittedNum));
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void addSplits(List<FromElementsSplit> splits) {
+        emittedNum = splits.get(0).getEmitNum();
+        LOG.info("FromElementsSourceReader restores from {}.", emittedNum);
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {}
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.info("checkpoint {} finished.", checkpointId);
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
new file mode 100644
index 0000000..a2bb8cc
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+/** Mock enumerator state. */
+public class NoOpEnumState {}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
new file mode 100644
index 0000000..7be0e8d
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/** Mock enumerator state seializer. */
+public class NoOpEnumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(NoOpEnumState obj) throws IOException {
+        return new byte[0];
+    }
+
+    @Override
+    public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
+        return new NoOpEnumState();
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
new file mode 100644
index 0000000..b23bfa7
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Mock enumerator. */
+public class NoOpEnumerator implements SplitEnumerator<FromElementsSplit, NoOpEnumState> {
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}
+
+    @Override
+    public void addSplitsBack(List<FromElementsSplit> splits, int subtaskId) {}
+
+    @Override
+    public void addReader(int subtaskId) {}
+
+    @Override
+    public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+        return new NoOpEnumState();
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
new file mode 100644
index 0000000..d5ae8ad
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+
+/** The split of the {@link FromElementsSource}. */
+public class FromElementsSplit implements SourceSplit {
+    public static final String SPLIT_ID = "fakeSplitId";
+
+    private int emitNum;
+
+    public FromElementsSplit(int emitNum) {
+        this.emitNum = emitNum;
+    }
+
+    public int getEmitNum() {
+        return emitNum;
+    }
+
+    public void setEmitNum(int emitNum) {
+        this.emitNum = emitNum;
+    }
+
+    @Override
+    public String splitId() {
+        return SPLIT_ID;
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
new file mode 100644
index 0000000..ab32917
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** The split serializer for the {@link FromElementsSource}. */
+public class FromElementsSplitSerializer implements SimpleVersionedSerializer<FromElementsSplit> {
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(FromElementsSplit split) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(split.getEmitNum());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public FromElementsSplit deserialize(int version, byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            int emitNum = in.readInt();
+            return new FromElementsSplit(emitNum);
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
new file mode 100644
index 0000000..2c13e83
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connector.testframe.utils.MetricQuerier;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.commons.math3.util.Precision;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test DataStream connector sink.
+     *
+     * <p>The following tests will create a sink in the external system, generate a collection of
+     * test data and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass these tests, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantics. There's no requirement for records order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testBasicSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        DataStream<T> dataStream =
+                execEnv.fromCollection(testRecords)
+                        .name("sourceInSinkTest")
+                        .setParallelism(1)
+                        .returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .setParallelism(1)
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+        // Check test result
+        checkResultWithSemantic(
+                externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+
+        /**
+         * The job should stop after consume a specified number of records. In order to know when
+         * the specified number of records have been consumed, a collect sink is need to be watched.
+         */
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        String savepointPath;
+        try {
+            waitForAllTaskRunning(
+                    () ->
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+            waitExpectedSizeData(iterator, numBeforeSuccess);
+
+            savepointPath =
+                    jobClient
+                            .stopWithSavepoint(
+                                    true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL)
+                            .get(30, TimeUnit.SECONDS);
+            waitForJobStatus(
+                    jobClient,
+                    Collections.singletonList(JobStatus.FINISHED),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+        } catch (Exception e) {
+            executorService.shutdown();
+            killJob(jobClient);
+            throw e;
+        }
+
+        List<T> target = testRecords.subList(0, numBeforeSuccess);
+        checkResultWithSemantic(
+                externalContext.createSinkDataReader(sinkSettings), target, semantic);
+
+        // Step 4: restart the Flink job with the savepoint
+        final StreamExecutionEnvironment restartEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .setSavepointRestorePath(savepointPath)
+                                .build());
+        restartEnv.enableCheckpointing(50);
+
+        DataStreamSource<T> restartSource =
+                restartEnv
+                        .fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "restartSource")
+                        .setParallelism(1);
+
+        DataStream<T> sinkStream = restartSource.returns(externalContext.getProducedType());
+        tryCreateSink(sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism);
+        addCollectSink(restartSource);
+        final JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
+
+        try {
+            // Check the result
+            checkResultWithSemantic(
+                    externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+        } finally {
+            executorService.shutdown();
+            killJob(restartJobClient);
+            iterator.close();
+        }
+    }
+
+    /**
+     * Test connector sink metrics.
+     *
+     * <p>This test will create a sink in the external system, generate test data and write them to
+     * the sink via a Flink job. Then read and compare the metrics.
+     *
+     * <p>Now test: numRecordsOut
+     */
+    @TestTemplate
+    @DisplayName("Test sink metrics")
+    public void testMetrics(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        int parallelism = 1;
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // make sure use different names when executes multi times
+        String sinkName = "metricTestSink" + testRecords.hashCode();
+        final StreamExecutionEnvironment env =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        env.enableCheckpointing(50);
+
+        DataStreamSource<T> source =
+                env.fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "metricTestSource")
+                        .setParallelism(1);
+
+        DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .name(sinkName)
+                .setParallelism(parallelism);
+        final JobClient jobClient = env.executeAsync("Metrics Test");
+        final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            waitForAllTaskRunning(
+                    () ->
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+            waitUntilCondition(
+                    () -> {
+                        // test metrics
+                        try {
+                            return compareSinkMetrics(
+                                    queryRestClient,
+                                    testEnv,
+                                    externalContext,
+                                    jobClient.getJobID(),
+                                    sinkName,
+                                    testRecords.size());
+                        } catch (Exception e) {
+                            // skip failed assert try
+                            return false;
+                        }
+                    },
+                    Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+        } finally {
+            // Clean up
+            executorService.shutdown();
+            killJob(jobClient);
+        }
+    }
+
+    // ----------------------------- Helper Functions ---------------------------------
+
+    /**
+     * Generate a set of test records.
+     *
+     * @param testingSinkSettings sink settings
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected List<T> generateTestData(
+            TestingSinkSettings testingSinkSettings,
+            DataStreamSinkExternalContext<T> externalContext) {
+        return externalContext.generateTestData(
+                testingSinkSettings, ThreadLocalRandom.current().nextLong());
+    }
+
+    /**
+     * Poll records from the sink.
+     *
+     * @param result Append records to which list
+     * @param reader The sink reader
+     * @param expected The expected list which help to stop polling
+     * @param retryTimes The retry times
+     * @param semantic The semantic
+     * @return Collection of records in the Sink
+     */
+    private List<T> pollAndAppendResultData(
+            List<T> result,
+            ExternalSystemDataReader<T> reader,
+            List<T> expected,
+            int retryTimes,
+            CheckpointingMode semantic) {
+        long timeoutMs = 1000L;
+        int retryIndex = 0;
+
+        while (retryIndex++ < retryTimes
+                && !checkGetEnoughRecordsWithSemantic(expected, result, semantic)) {
+            result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+        }
+        return result;
+    }
+
+    /**
+     * Check whether the polling should stop.
+     *
+     * @param expected The expected list which help to stop polling
+     * @param result The records that have been read
+     * @param semantic The semantic
+     * @return Whether the polling should stop
+     */
+    private boolean checkGetEnoughRecordsWithSemantic(
+            List<T> expected, List<T> result, CheckpointingMode semantic) {
+        checkNotNull(expected);
+        checkNotNull(result);
+        if (EXACTLY_ONCE.equals(semantic)) {
+            return expected.size() <= result.size();
+        } else if (AT_LEAST_ONCE.equals(semantic)) {
+            Set<Integer> matchedIndex = new HashSet<>();
+            for (T record : expected) {
+                int before = matchedIndex.size();
+                for (int i = 0; i < result.size(); i++) {
+                    if (matchedIndex.contains(i)) {
+                        continue;
+                    }
+                    if (record.equals(result.get(i))) {
+                        matchedIndex.add(i);
+                        break;
+                    }
+                }
+                // if not find the record in the result
+                if (before == matchedIndex.size()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        throw new IllegalStateException(
+                String.format("%s delivery guarantee doesn't support test.", semantic.name()));
+    }
+
+    /**
+     * Compare the test data with actual data in given semantic.
+     *
+     * @param reader the data reader for the sink
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     */
+    private void checkResultWithSemantic(
+            ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+            throws Exception {
+        final ArrayList<T> result = new ArrayList<>();
+        waitUntilCondition(
+                () -> {
+                    pollAndAppendResultData(result, reader, testData, 30, semantic);
+                    try {
+                        CollectIteratorAssertions.assertThat(sort(result).iterator())
+                                .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
+                        return true;
+                    } catch (Throwable t) {
+                        return false;
+                    }
+                },
+                Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+    }
+
+    /** Compare the metrics. */
+    private boolean compareSinkMetrics(
+            MetricQuerier metricQuerier,
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> context,
+            JobID jobId,
+            String sinkName,
+            long allRecordSize)
+            throws Exception {
+        double sumNumRecordsOut =
+                metricQuerier.getAggregatedMetricsByRestAPI(
+                        testEnv.getRestEndpoint(),
+                        jobId,
+                        sinkName,
+                        MetricNames.IO_NUM_RECORDS_OUT,
+                        getSinkMetricFilter(context));
+        return Precision.equals(allRecordSize, sumNumRecordsOut);
+    }
+
+    /** Sort the list. */
+    private List<T> sort(List<T> list) {
+        return list.stream().sorted().collect(Collectors.toList());
+    }
+
+    private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
+        return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
+    }
+
+    private void killJob(JobClient jobClient) throws Exception {
+        terminateJob(jobClient);
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+    }
+
+    private DataStreamSink<T> tryCreateSink(
+            DataStream<T> dataStream,
+            DataStreamSinkExternalContext<T> context,
+            TestingSinkSettings sinkSettings) {
+        try {
+            if (context instanceof DataStreamSinkV1ExternalContext) {
+                org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sinkV1 =
+                        ((DataStreamSinkV1ExternalContext<T>) context).createSink(sinkSettings);
+                return dataStream.sinkTo(sinkV1);
+            } else if (context instanceof DataStreamSinkV2ExternalContext) {
+                Sink<T> sinkV2 =
+                        ((DataStreamSinkV2ExternalContext<T>) context).createSink(sinkSettings);
+                return dataStream.sinkTo(sinkV2);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.",
+                                context.getClass()));
+            }
+        } catch (UnsupportedOperationException e) {
+            // abort the test
+            throw new TestAbortedException("Cannot create a sink satisfying given options.", e);
+        }
+    }
+
+    /**
+     * Return the filter used to filter the sink metric.
+     *
+     * <ul>
+     *   <li>Sink v1: return null.
+     *   <li>Sink v2: return the "Writer" prefix in the `SinkTransformationTranslator`.
+     * </ul>
+     */
+    private String getSinkMetricFilter(DataStreamSinkExternalContext<T> context) {
+        if (context instanceof DataStreamSinkV1ExternalContext) {
+            return null;
+        } else if (context instanceof DataStreamSinkV2ExternalContext) {
+            // See class `SinkTransformationTranslator`
+            return "Writer";
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Get unexpected sink context: %s", context.getClass()));
+        }
+    }
+
+    protected CollectResultIterator<T> addCollectSink(DataStream<T> stream) {
+        TypeSerializer<T> serializer =
+                stream.getType().createSerializer(stream.getExecutionConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
+        CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
+        sink.name("Data stream collect sink");
+        stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+        return new CollectResultIterator<>(
+                operator.getOperatorIdFuture(),
+                serializer,
+                accumulatorName,
+                stream.getExecutionEnvironment().getCheckpointConfig());
+    }
+
+    private void waitExpectedSizeData(CollectResultIterator<T> iterator, int targetNum) {
+        assertThat(
+                        CompletableFuture.supplyAsync(
+                                () -> {
+                                    int count = 0;
+                                    while (count < targetNum && iterator.hasNext()) {
+                                        iterator.next();
+                                        count++;
+                                    }
+                                    if (count < targetNum) {
+                                        throw new IllegalStateException(
+                                                String.format(
+                                                        "Fail to get %d records.", targetNum));
+                                    }
+                                    return true;
+                                }))
+                .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 992e12a..a90d05b 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.connector.testframe.utils.MetricQuerier;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -70,11 +71,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
 import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
@@ -447,11 +451,14 @@ public abstract class SourceTestSuiteBase<T> {
         final JobClient jobClient = env.executeAsync("Metrics Test");
 
         final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+        final ExecutorService executorService = Executors.newCachedThreadPool();
         try {
             waitForAllTaskRunning(
                     () ->
-                            queryRestClient.getJobDetails(
-                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
                     Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
 
             waitUntilCondition(
@@ -472,6 +479,7 @@ public abstract class SourceTestSuiteBase<T> {
                     Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
         } finally {
             // Clean up
+            executorService.shutdown();
             killJob(jobClient);
         }
     }
@@ -775,7 +783,8 @@ public abstract class SourceTestSuiteBase<T> {
                         testEnv.getRestEndpoint(),
                         jobId,
                         sourceName,
-                        MetricNames.IO_NUM_RECORDS_IN);
+                        MetricNames.IO_NUM_RECORDS_IN,
+                        null);
         return Precision.equals(allRecordSize, sumNumRecordsIn);
     }
 
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
index 9fe13c1..d749132 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
@@ -40,6 +41,9 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -56,16 +60,15 @@ public class MetricQuerier {
         restClient = new RestClient(configuration, Executors.newCachedThreadPool());
     }
 
-    public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, JobID jobId)
-            throws Exception {
+    public static JobDetailsInfo getJobDetails(
+            RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception {
         String jmAddress = endpoint.getAddress();
         int jmPort = endpoint.getPort();
 
         final JobMessageParameters params = new JobMessageParameters();
         params.jobPathParameter.resolve(jobId);
 
-        return restClient
-                .sendRequest(
+        return client.sendRequest(
                         jmAddress,
                         jmPort,
                         JobDetailsHeaders.getInstance(),
@@ -120,10 +123,11 @@ public class MetricQuerier {
             TestEnvironment.Endpoint endpoint,
             JobID jobId,
             String sourceOrSinkName,
-            String metricName)
+            String metricName,
+            String filter)
             throws Exception {
         // get job details, including the vertex id
-        JobDetailsInfo jobDetailsInfo = getJobDetails(endpoint, jobId);
+        JobDetailsInfo jobDetailsInfo = getJobDetails(restClient, endpoint, jobId);
 
         // get the vertex id for source/sink operator
         JobDetailsInfo.JobVertexDetailsInfo vertex =
@@ -143,8 +147,8 @@ public class MetricQuerier {
                 metricsResponseBody.getMetrics().stream()
                         .filter(
                                 m ->
-                                        m.getId().endsWith(metricName)
-                                                && m.getId().contains(sourceOrSinkName))
+                                        filterByMetricName(
+                                                m.getId(), sourceOrSinkName, metricName, filter))
                         .map(m -> m.getId())
                         .collect(Collectors.joining(","));
 
@@ -157,6 +161,27 @@ public class MetricQuerier {
 
         AggregatedMetricsResponseBody metricsResponse =
                 getMetrics(endpoint, jobId, vertexId, queryParam);
-        return metricsResponse.getMetrics().iterator().next().getSum();
+
+        Collection<AggregatedMetric> metrics = metricsResponse.getMetrics();
+        if (metrics == null || metrics.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot find metric[%s] for operator [%s] with filter [%s].",
+                            metricName, sourceOrSinkName, filter));
+        }
+        return metrics.iterator().next().getSum();
+    }
+
+    private boolean filterByMetricName(
+            String metricName,
+            String sourceOrSinkName,
+            String targetMetricName,
+            @Nullable String filter) {
+        boolean filterByName =
+                metricName.endsWith(targetMetricName) && metricName.contains(sourceOrSinkName);
+        if (!StringUtils.isNullOrWhitespaceOnly(filter)) {
+            return filterByName && metricName.contains(filter);
+        }
+        return filterByName;
     }
 }