You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/02/16 01:31:55 UTC

[flink] branch master updated (6222532 -> 3106f14)

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6222532  [FLINK-24474] Default rest.bind-address to localhost in flink-conf.yaml
     new 57e3f03  [FLINK-25289][tests] Introduce sink test suite in connector test framework
     new 3106f14  [hotfix][tests][connector/kafka] Disable KafkaSink metric tests until FLINK-26126 has been fixed

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/kafka/sink/KafkaSinkITCase.java      |  55 ++
 .../kafka/sink/testutils/KafkaDataReader.java      |  69 +++
 .../sink/testutils/KafkaSinkExternalContext.java   | 272 +++++++++
 .../testutils/KafkaSinkExternalContextFactory.java |  53 ++
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  10 +-
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   |  96 +++
 .../flink-connector-test-utils/pom.xml             |  26 +
 .../environment/MiniClusterTestEnvironment.java    |  22 +-
 .../sink/DataStreamSinkExternalContext.java        |  10 -
 ...t.java => DataStreamSinkV1ExternalContext.java} |  29 +-
 .../sink/DataStreamSinkV2ExternalContext.java      |  38 ++
 .../testframe/source/FromElementsSource.java       | 103 ++++
 .../testframe/source/FromElementsSourceReader.java | 134 +++++
 .../testframe/source/enumerator/NoOpEnumState.java |  22 +
 .../source/enumerator/NoOpEnumStateSerializer.java |  41 ++
 .../source/enumerator/NoOpEnumerator.java          |  50 ++
 .../testframe/source/split/FromElementsSplit.java  |  46 ++
 .../source/split/FromElementsSplitSerializer.java  |  55 ++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 643 +++++++++++++++++++++
 .../testframe/testsuites/SourceTestSuiteBase.java  |  15 +-
 .../connector/testframe/utils/MetricQuerier.java   |  43 +-
 21 files changed, 1762 insertions(+), 70 deletions(-)
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
 create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
 copy flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/{DataStreamSinkExternalContext.java => DataStreamSinkV1ExternalContext.java} (53%)
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
 create mode 100644 flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java

[flink] 01/02: [FLINK-25289][tests] Introduce sink test suite in connector test framework

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57e3f03ccd719ed772c983ba335517d95f8f3e6a
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Tue Feb 15 20:15:21 2022 +0800

    [FLINK-25289][tests] Introduce sink test suite in connector test framework
    
    This closes #18496.
---
 .../connector/kafka/sink/KafkaSinkITCase.java      |  46 ++
 .../kafka/sink/testutils/KafkaDataReader.java      |  69 +++
 .../sink/testutils/KafkaSinkExternalContext.java   | 272 +++++++++
 .../testutils/KafkaSinkExternalContextFactory.java |  53 ++
 .../flink-end-to-end-tests-common-kafka/pom.xml    |  10 +-
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   |  87 +++
 .../flink-connector-test-utils/pom.xml             |  26 +
 .../environment/MiniClusterTestEnvironment.java    |  22 +-
 .../sink/DataStreamSinkExternalContext.java        |  10 -
 ...t.java => DataStreamSinkV1ExternalContext.java} |  29 +-
 .../sink/DataStreamSinkV2ExternalContext.java      |  38 ++
 .../testframe/source/FromElementsSource.java       | 103 ++++
 .../testframe/source/FromElementsSourceReader.java | 134 +++++
 .../testframe/source/enumerator/NoOpEnumState.java |  22 +
 .../source/enumerator/NoOpEnumStateSerializer.java |  41 ++
 .../source/enumerator/NoOpEnumerator.java          |  50 ++
 .../testframe/source/split/FromElementsSplit.java  |  46 ++
 .../source/split/FromElementsSplitSerializer.java  |  55 ++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 643 +++++++++++++++++++++
 .../testframe/testsuites/SourceTestSuiteBase.java  |  15 +-
 .../connector/testframe/utils/MetricQuerier.java   |  43 +-
 21 files changed, 1744 insertions(+), 70 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 2a35112..4234f7a 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -31,12 +31,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -48,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
@@ -69,11 +81,15 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
 
 import javax.annotation.Nullable;
 
@@ -161,6 +177,36 @@ public class KafkaSinkITCase extends TestLogger {
         deleteTestTopic(topic);
     }
 
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTests extends SinkTestSuiteBase<String> {
+        // Defines test environment on Flink MiniCluster
+        @SuppressWarnings("unused")
+        @TestEnv
+        MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        // Defines external system
+        @TestExternalSystem
+        DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+                DefaultContainerizedExternalSystem.builder()
+                        .fromContainer(
+                                new KafkaContainer(
+                                        DockerImageName.parse(DockerImageVersions.KAFKA)))
+                        .build();
+
+        @SuppressWarnings("unused")
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @SuppressWarnings("unused")
+        @TestContext
+        KafkaSinkExternalContextFactory sinkContext =
+                new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
+    }
+
     @Test
     public void testWriteRecordsToKafkaWithAtLeastOnceGuarantee() throws Exception {
         writeRecordsToKafka(DeliveryGuarantee.AT_LEAST_ONCE, emittedRecordsCount);
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
new file mode 100644
index 0000000..0847cd4
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaDataReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/** Kafka dataStream data reader. */
+public class KafkaDataReader implements ExternalSystemDataReader<String> {
+    private final KafkaConsumer<String, String> consumer;
+
+    public KafkaDataReader(Properties properties, Collection<TopicPartition> partitions) {
+        this.consumer = new KafkaConsumer<>(properties);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+    }
+
+    @Override
+    public List<String> poll(Duration timeout) {
+        List<String> result = new LinkedList<>();
+        ConsumerRecords<String, String> consumerRecords;
+        try {
+            consumerRecords = consumer.poll(timeout);
+        } catch (WakeupException we) {
+            return Collections.emptyList();
+        }
+        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
+        while (iterator.hasNext()) {
+            result.add(iterator.next().value());
+        }
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (consumer != null) {
+            consumer.close();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
new file mode 100644
index 0000000..7c287d2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+
+/** A Kafka external context that will create only one topic and use partitions in that topic. */
+public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
+
+    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
+    private static final long DEFAULT_TIMEOUT = 30L;
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000;
+
+    protected String bootstrapServers;
+    protected final String topicName;
+
+    private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>();
+
+    protected int numSplits = 0;
+
+    private List<URL> connectorJarPaths;
+
+    protected final AdminClient kafkaAdminClient;
+
+    public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) {
+        this.bootstrapServers = bootstrapServers;
+        this.connectorJarPaths = connectorJarPaths;
+        this.topicName =
+                TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        kafkaAdminClient = createAdminClient();
+    }
+
+    private void createTopic(String topicName, int numPartitions, short replicationFactor) {
+        LOG.debug(
+                "Creating new Kafka topic {} with {} partitions and {} replicas",
+                topicName,
+                numPartitions,
+                replicationFactor);
+        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
+        try {
+            kafkaAdminClient
+                    .createTopics(Collections.singletonList(newTopic))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
+        }
+    }
+
+    private void deleteTopic(String topicName) {
+        LOG.debug("Deleting Kafka topic {}", topicName);
+        try {
+            kafkaAdminClient
+                    .deleteTopics(Collections.singletonList(topicName))
+                    .all()
+                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
+                throw new RuntimeException(
+                        String.format("Cannot delete unknown Kafka topic '%s'", topicName), e);
+            }
+        }
+    }
+
+    private AdminClient createAdminClient() {
+        final Properties config = new Properties();
+        config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        return AdminClient.create(config);
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        if (!topicExists(topicName)) {
+            createTopic(topicName, 4, (short) 1);
+        }
+
+        KafkaSinkBuilder<String> builder = KafkaSink.builder();
+        final Properties properties = new Properties();
+        properties.put(
+                ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT_IN_MS);
+        builder.setBootstrapServers(bootstrapServers)
+                .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode()))
+                .setTransactionalIdPrefix("testingFramework")
+                .setKafkaProducerConfig(properties)
+                .setRecordSerializer(
+                        KafkaRecordSerializationSchema.builder()
+                                .setTopic(topicName)
+                                .setValueSerializationSchema(new SimpleStringSchema())
+                                .build());
+        return builder.build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        LOG.info("Fetching information for topic: {}", topicName);
+        final Map<String, TopicDescription> topicMetadata =
+                getTopicMetadata(Arrays.asList(topicName));
+
+        Set<TopicPartition> subscribedPartitions = new HashSet<>();
+        for (TopicDescription topic : topicMetadata.values()) {
+            for (TopicPartitionInfo partition : topic.partitions()) {
+                subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
+            }
+        }
+
+        Properties properties = new Properties();
+        properties.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "flink-kafka-test" + subscribedPartitions.hashCode());
+        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        properties.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        if (EXACTLY_ONCE.equals(sinkSettings.getCheckpointingMode())) {
+            // default is read_uncommitted
+            properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        readers.add(new KafkaDataReader(properties, subscribedPartitions));
+        return readers.get(readers.size() - 1);
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<String> randomStringRecords = new ArrayList<>();
+        int recordNum =
+                random.nextInt(NUM_RECORDS_UPPER_BOUND - NUM_RECORDS_LOWER_BOUND)
+                        + NUM_RECORDS_LOWER_BOUND;
+        for (int i = 0; i < recordNum; i++) {
+            int stringLength = random.nextInt(RANDOM_STRING_MAX_LENGTH) + 1;
+            randomStringRecords.add(RandomStringUtils.random(stringLength, true, true));
+        }
+        return randomStringRecords;
+    }
+
+    protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
+        try {
+            return kafkaAdminClient.describeTopics(topics).all().get();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Failed to get metadata for topics %s.", topics), e);
+        }
+    }
+
+    private boolean topicExists(String topic) {
+        try {
+            kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (numSplits != 0) {
+            deleteTopic(topicName);
+        }
+        readers.stream()
+                .filter(Objects::nonNull)
+                .forEach(
+                        reader -> {
+                            try {
+                                reader.close();
+                            } catch (Exception e) {
+                                if (kafkaAdminClient != null) {
+                                    kafkaAdminClient.close();
+                                }
+                                throw new RuntimeException("Cannot close split writer", e);
+                            }
+                        });
+        readers.clear();
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Single-topic Kafka";
+    }
+
+    @Override
+    public List<URL> getConnectorJarPaths() {
+        return connectorJarPaths;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(String.class);
+    }
+
+    private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
+        switch (checkpointingMode) {
+            case EXACTLY_ONCE:
+                return DeliveryGuarantee.EXACTLY_ONCE;
+            case AT_LEAST_ONCE:
+                return DeliveryGuarantee.AT_LEAST_ONCE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.",
+                                checkpointingMode));
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
new file mode 100644
index 0000000..b795854
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.testutils;
+
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+
+import org.testcontainers.containers.KafkaContainer;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Kafka sink external context factory. */
+public class KafkaSinkExternalContextFactory
+        implements ExternalContextFactory<KafkaSinkExternalContext> {
+
+    private final KafkaContainer kafkaContainer;
+    private final List<URL> connectorJars;
+
+    public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, List<URL> connectorJars) {
+        this.kafkaContainer = kafkaContainer;
+        this.connectorJars = connectorJars;
+    }
+
+    private String getBootstrapServer() {
+        final String internalEndpoints =
+                kafkaContainer.getNetworkAliases().stream()
+                        .map(host -> String.join(":", host, Integer.toString(9092)))
+                        .collect(Collectors.joining(","));
+        return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints);
+    }
+
+    @Override
+    public KafkaSinkExternalContext createExternalContext(String testName) {
+        return new KafkaSinkExternalContext(getBootstrapServer(), connectorJars);
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
index d37d55d..ec7bd0c 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -150,7 +150,6 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka</artifactId>
@@ -225,6 +224,15 @@ under the License.
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
 						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${project.version}</version>
+							<classifier>source</classifier>
+							<destFileName>flink-connector-testing.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
 							<groupId>org.apache.kafka</groupId>
 							<artifactId>kafka-clients</artifactId>
 							<version>2.8.1</version>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
new file mode 100644
index 0000000..520491e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.kafka;
+
+import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Arrays;
+
+/** Kafka sink E2E test based on connector testing framework. */
+@SuppressWarnings("unused")
+public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
+    private static final String KAFKA_HOSTNAME = "kafka";
+
+    @TestSemantics
+    CheckpointingMode[] semantics =
+            new CheckpointingMode[] {
+                CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+            };
+
+    // Defines TestEnvironment
+    @TestEnv FlinkContainerTestEnvironment flink = new FlinkContainerTestEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem
+    @TestExternalSystem
+    DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+            DefaultContainerizedExternalSystem.builder()
+                    .fromContainer(
+                            new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+                                    .withNetworkAliases(KAFKA_HOSTNAME))
+                    .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
+                    .build();
+
+    // Defines 2 External context Factories, so test cases will be invoked twice using these two
+    // kinds of external contexts.
+    @TestContext
+    KafkaSinkExternalContextFactory contextFactory =
+            new KafkaSinkExternalContextFactory(
+                    kafka.getContainer(),
+                    Arrays.asList(
+                            TestUtils.getResource("kafka-connector.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("kafka-clients.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            TestUtils.getResource("flink-connector-testing.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()));
+
+    public KafkaSinkE2ECase() throws Exception {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/pom.xml b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
index 13aeea7..b3c1067 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-connector-test-utils/pom.xml
@@ -95,4 +95,30 @@
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<shadedClassifierName>source</shadedClassifierName>
+							<artifactSet>
+								<includes>
+									<include>**/connector/testframe/source/**</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
 </project>
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
index c5f29d1..0ecbe7f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
@@ -30,17 +30,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -161,7 +160,7 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
         }
         isStarted = false;
         this.miniCluster.after();
-        deletePath(checkpointPath);
+        FileUtils.deleteDirectory(checkpointPath.toFile());
         LOG.debug("MiniCluster has been tear down");
     }
 
@@ -180,21 +179,4 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr
     public String toString() {
         return "MiniCluster";
     }
-
-    /** Deletes the given path recursively. */
-    public static void deletePath(Path path) throws IOException {
-        final List<File> files =
-                Files.walk(path)
-                        .filter(p -> p != path)
-                        .map(Path::toFile)
-                        .collect(Collectors.toList());
-        for (File file : files) {
-            if (file.isDirectory()) {
-                deletePath(file.toPath());
-            } else {
-                file.delete();
-            }
-        }
-        Files.deleteIfExists(path);
-    }
 }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
index 93d638c..25e638f 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.testframe.external.sink;
 
 import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.connector.testframe.external.ExternalContext;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
@@ -34,15 +33,6 @@ import java.util.List;
 @Experimental
 public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
 
-    /**
-     * Create an instance of {@link Sink} satisfying given options.
-     *
-     * @param sinkSettings settings of the sink
-     * @throws UnsupportedOperationException if the provided option is not supported.
-     */
-    Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
-            throws UnsupportedOperationException;
-
     /** Create a reader for consuming data written to the external system by sink. */
     ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
 
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
similarity index 53%
copy from flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
copy to flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
index 93d638c..da36a22 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkExternalContext.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV1ExternalContext.java
@@ -20,20 +20,14 @@ package org.apache.flink.connector.testframe.external.sink;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.connector.testframe.external.ExternalContext;
-import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
-
-import java.util.List;
 
 /**
- * External context for DataStream sinks.
+ * External context for DataStream sinks whose version is V1.
  *
  * @param <T> Type of elements before serialization by sink
  */
 @Experimental
-public interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {
-
+public interface DataStreamSinkV1ExternalContext<T> extends DataStreamSinkExternalContext<T> {
     /**
      * Create an instance of {@link Sink} satisfying given options.
      *
@@ -42,23 +36,4 @@ public interface DataStreamSinkExternalContext<T> extends ExternalContext, Resul
      */
     Sink<T, ?, ?, ?> createSink(TestingSinkSettings sinkSettings)
             throws UnsupportedOperationException;
-
-    /** Create a reader for consuming data written to the external system by sink. */
-    ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);
-
-    /**
-     * Generate test data.
-     *
-     * <p>These test data will be sent to sink via a special source in Flink job, write to external
-     * system by sink, consume back via {@link ExternalSystemDataReader}, and make comparison with
-     * {@link T#equals(Object)} for validating correctness.
-     *
-     * <p>Make sure that the {@link T#equals(Object)} returns false when the records in different
-     * splits.
-     *
-     * @param sinkSettings settings of the sink
-     * @param seed Seed for generating random test data set.
-     * @return List of generated test data.
-     */
-    List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);
 }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
new file mode 100644
index 0000000..ce2cf07
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/DataStreamSinkV2ExternalContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.external.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.sink2.Sink;
+
+/**
+ * External context for DataStream sinks whose version is V2.
+ *
+ * @param <T> Type of elements before serialization by sink
+ */
+@Experimental
+public interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {
+    /**
+     * Create an instance of {@link Sink} satisfying given options.
+     *
+     * @param sinkSettings settings of the sink
+     * @throws UnsupportedOperationException if the provided option is not supported.
+     */
+    Sink<T> createSink(TestingSinkSettings sinkSettings) throws UnsupportedOperationException;
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
new file mode 100644
index 0000000..8f98e2c
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.testframe.source.enumerator.NoOpEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * A {@link Source} implementation that reads data from a list and stops reading at the fixed
+ * position. The source will wait until the checkpoint or savepoint triggered, the source is useful
+ * for connector tests.
+ *
+ * <p>Note: This parallelism of source must be 1.
+ */
+public class FromElementsSource<OUT> implements Source<OUT, FromElementsSplit, NoOpEnumState> {
+    private Boundedness boundedness;
+
+    private List<OUT> elements;
+
+    private Integer emittedElementsNum;
+
+    public FromElementsSource(List<OUT> elements) {
+        this.elements = elements;
+    }
+
+    public FromElementsSource(
+            Boundedness boundedness, List<OUT> elements, Integer emittedElementsNum) {
+        this(elements);
+        if (emittedElementsNum != null) {
+            Preconditions.checkState(
+                    emittedElementsNum <= elements.size(),
+                    String.format(
+                            "The emittedElementsNum must not be larger than the elements list %d, but actual emittedElementsNum is %d",
+                            elements.size(), emittedElementsNum));
+        }
+        this.boundedness = boundedness;
+        this.emittedElementsNum = emittedElementsNum;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness == null ? Boundedness.BOUNDED : boundedness;
+    }
+
+    @Override
+    public SourceReader<OUT, FromElementsSplit> createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return new FromElementsSourceReader<>(
+                emittedElementsNum, elements, boundedness, readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<FromElementsSplit, NoOpEnumState> createEnumerator(
+            SplitEnumeratorContext<FromElementsSplit> enumContext) throws Exception {
+        return new NoOpEnumerator();
+    }
+
+    @Override
+    public SplitEnumerator<FromElementsSplit, NoOpEnumState> restoreEnumerator(
+            SplitEnumeratorContext<FromElementsSplit> enumContext, NoOpEnumState checkpoint)
+            throws Exception {
+        return new NoOpEnumerator();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<FromElementsSplit> getSplitSerializer() {
+        return new FromElementsSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
+        return new NoOpEnumStateSerializer();
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
new file mode 100644
index 0000000..e00942a
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/FromElementsSourceReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.Counter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
+
+/**
+ * A {@link SourceReader} implementation that reads data from a list. If limitedNum is set, the
+ * reader will stop reading at the limitedNum position until the checkpoint or savepoint triggered.
+ */
+public class FromElementsSourceReader<T> implements SourceReader<T, FromElementsSplit> {
+    private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class);
+
+    private volatile int emittedNum;
+    private volatile boolean isRunning = true;
+
+    /** The context of this source reader. */
+    private SourceReaderContext context;
+
+    private Integer limitedNum;
+    private Boundedness boundedness;
+    private volatile boolean checkpointAtLimitedNum = false;
+    private List<T> elements;
+    private Counter numRecordInCounter;
+
+    public FromElementsSourceReader(
+            Integer limitedNum,
+            List<T> elements,
+            Boundedness boundedness,
+            SourceReaderContext context) {
+        this.context = context;
+        this.emittedNum = 0;
+        this.elements = elements;
+        this.limitedNum = limitedNum;
+        this.boundedness = boundedness;
+        this.numRecordInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
+        if (isRunning && emittedNum < elements.size()) {
+            /*
+             * The reader will stop reading when it has emitted `successNum` records.
+             * If and only if a checkpoint whose `numElementsEmitted` is equal to `successNum`
+             * is completed, the reader will continue reading.
+             *
+             * When we disable the checkpointing and stop with a savepoint after
+             * receiving `successNum` records, the job starting with the savepoint
+             * will continue to read the records after the `successNum` records.
+             */
+            if (limitedNum == null
+                    || (limitedNum != null
+                            && (emittedNum < limitedNum || checkpointAtLimitedNum))) {
+                output.collect(elements.get(emittedNum));
+                emittedNum++;
+                numRecordInCounter.inc();
+            }
+            return MORE_AVAILABLE;
+        }
+
+        if (Boundedness.CONTINUOUS_UNBOUNDED.equals(boundedness)) {
+            return MORE_AVAILABLE;
+        } else {
+            return InputStatus.END_OF_INPUT;
+        }
+    }
+
+    @Override
+    public List<FromElementsSplit> snapshotState(long checkpointId) {
+        if (limitedNum != null && !checkpointAtLimitedNum && emittedNum == limitedNum) {
+            checkpointAtLimitedNum = true;
+            LOG.info("checkpoint {} is the target checkpoint to be used.", checkpointId);
+        }
+        return Arrays.asList(new FromElementsSplit(emittedNum));
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public void addSplits(List<FromElementsSplit> splits) {
+        emittedNum = splits.get(0).getEmitNum();
+        LOG.info("FromElementsSourceReader restores from {}.", emittedNum);
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {}
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.info("checkpoint {} finished.", checkpointId);
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
new file mode 100644
index 0000000..a2bb8cc
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+/** Mock enumerator state. */
+public class NoOpEnumState {}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
new file mode 100644
index 0000000..7be0e8d
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/** Mock enumerator state seializer. */
+public class NoOpEnumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(NoOpEnumState obj) throws IOException {
+        return new byte[0];
+    }
+
+    @Override
+    public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
+        return new NoOpEnumState();
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
new file mode 100644
index 0000000..b23bfa7
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Mock enumerator. */
+public class NoOpEnumerator implements SplitEnumerator<FromElementsSplit, NoOpEnumState> {
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}
+
+    @Override
+    public void addSplitsBack(List<FromElementsSplit> splits, int subtaskId) {}
+
+    @Override
+    public void addReader(int subtaskId) {}
+
+    @Override
+    public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+        return new NoOpEnumState();
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
new file mode 100644
index 0000000..d5ae8ad
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+
+/** The split of the {@link FromElementsSource}. */
+public class FromElementsSplit implements SourceSplit {
+    public static final String SPLIT_ID = "fakeSplitId";
+
+    private int emitNum;
+
+    public FromElementsSplit(int emitNum) {
+        this.emitNum = emitNum;
+    }
+
+    public int getEmitNum() {
+        return emitNum;
+    }
+
+    public void setEmitNum(int emitNum) {
+        this.emitNum = emitNum;
+    }
+
+    @Override
+    public String splitId() {
+        return SPLIT_ID;
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
new file mode 100644
index 0000000..ab32917
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/split/FromElementsSplitSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.split;
+
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** The split serializer for the {@link FromElementsSource}. */
+public class FromElementsSplitSerializer implements SimpleVersionedSerializer<FromElementsSplit> {
+    @Override
+    public int getVersion() {
+        return 0;
+    }
+
+    @Override
+    public byte[] serialize(FromElementsSplit split) throws IOException {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(split.getEmitNum());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public FromElementsSplit deserialize(int version, byte[] serialized) throws IOException {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            int emitNum = in.readInt();
+            return new FromElementsSplit(emitNum);
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
new file mode 100644
index 0000000..2c13e83
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.FromElementsSource;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connector.testframe.utils.MetricQuerier;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.apache.commons.math3.util.Precision;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test DataStream connector sink.
+     *
+     * <p>The following tests will create a sink in the external system, generate a collection of
+     * test data and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass these tests, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantics. There's no requirement for records order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testBasicSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        DataStream<T> dataStream =
+                execEnv.fromCollection(testRecords)
+                        .name("sourceInSinkTest")
+                        .setParallelism(1)
+                        .returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .setParallelism(1)
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+        // Check test result
+        checkResultWithSemantic(
+                externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector sink restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+
+        /**
+         * The job should stop after consume a specified number of records. In order to know when
+         * the specified number of records have been consumed, a collect sink is need to be watched.
+         */
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        String savepointPath;
+        try {
+            waitForAllTaskRunning(
+                    () ->
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+            waitExpectedSizeData(iterator, numBeforeSuccess);
+
+            savepointPath =
+                    jobClient
+                            .stopWithSavepoint(
+                                    true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL)
+                            .get(30, TimeUnit.SECONDS);
+            waitForJobStatus(
+                    jobClient,
+                    Collections.singletonList(JobStatus.FINISHED),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+        } catch (Exception e) {
+            executorService.shutdown();
+            killJob(jobClient);
+            throw e;
+        }
+
+        List<T> target = testRecords.subList(0, numBeforeSuccess);
+        checkResultWithSemantic(
+                externalContext.createSinkDataReader(sinkSettings), target, semantic);
+
+        // Step 4: restart the Flink job with the savepoint
+        final StreamExecutionEnvironment restartEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .setSavepointRestorePath(savepointPath)
+                                .build());
+        restartEnv.enableCheckpointing(50);
+
+        DataStreamSource<T> restartSource =
+                restartEnv
+                        .fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "restartSource")
+                        .setParallelism(1);
+
+        DataStream<T> sinkStream = restartSource.returns(externalContext.getProducedType());
+        tryCreateSink(sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism);
+        addCollectSink(restartSource);
+        final JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
+
+        try {
+            // Check the result
+            checkResultWithSemantic(
+                    externalContext.createSinkDataReader(sinkSettings), testRecords, semantic);
+        } finally {
+            executorService.shutdown();
+            killJob(restartJobClient);
+            iterator.close();
+        }
+    }
+
+    /**
+     * Test connector sink metrics.
+     *
+     * <p>This test will create a sink in the external system, generate test data and write them to
+     * the sink via a Flink job. Then read and compare the metrics.
+     *
+     * <p>Now test: numRecordsOut
+     */
+    @TestTemplate
+    @DisplayName("Test sink metrics")
+    public void testMetrics(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        int parallelism = 1;
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // make sure use different names when executes multi times
+        String sinkName = "metricTestSink" + testRecords.hashCode();
+        final StreamExecutionEnvironment env =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        env.enableCheckpointing(50);
+
+        DataStreamSource<T> source =
+                env.fromSource(
+                                new FromElementsSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "metricTestSource")
+                        .setParallelism(1);
+
+        DataStream<T> dataStream = source.returns(externalContext.getProducedType());
+        tryCreateSink(dataStream, externalContext, sinkSettings)
+                .name(sinkName)
+                .setParallelism(parallelism);
+        final JobClient jobClient = env.executeAsync("Metrics Test");
+        final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            waitForAllTaskRunning(
+                    () ->
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
+                    Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+
+            waitUntilCondition(
+                    () -> {
+                        // test metrics
+                        try {
+                            return compareSinkMetrics(
+                                    queryRestClient,
+                                    testEnv,
+                                    externalContext,
+                                    jobClient.getJobID(),
+                                    sinkName,
+                                    testRecords.size());
+                        } catch (Exception e) {
+                            // skip failed assert try
+                            return false;
+                        }
+                    },
+                    Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+        } finally {
+            // Clean up
+            executorService.shutdown();
+            killJob(jobClient);
+        }
+    }
+
+    // ----------------------------- Helper Functions ---------------------------------
+
+    /**
+     * Generate a set of test records.
+     *
+     * @param testingSinkSettings sink settings
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected List<T> generateTestData(
+            TestingSinkSettings testingSinkSettings,
+            DataStreamSinkExternalContext<T> externalContext) {
+        return externalContext.generateTestData(
+                testingSinkSettings, ThreadLocalRandom.current().nextLong());
+    }
+
+    /**
+     * Poll records from the sink.
+     *
+     * @param result Append records to which list
+     * @param reader The sink reader
+     * @param expected The expected list which help to stop polling
+     * @param retryTimes The retry times
+     * @param semantic The semantic
+     * @return Collection of records in the Sink
+     */
+    private List<T> pollAndAppendResultData(
+            List<T> result,
+            ExternalSystemDataReader<T> reader,
+            List<T> expected,
+            int retryTimes,
+            CheckpointingMode semantic) {
+        long timeoutMs = 1000L;
+        int retryIndex = 0;
+
+        while (retryIndex++ < retryTimes
+                && !checkGetEnoughRecordsWithSemantic(expected, result, semantic)) {
+            result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+        }
+        return result;
+    }
+
+    /**
+     * Check whether the polling should stop.
+     *
+     * @param expected The expected list which help to stop polling
+     * @param result The records that have been read
+     * @param semantic The semantic
+     * @return Whether the polling should stop
+     */
+    private boolean checkGetEnoughRecordsWithSemantic(
+            List<T> expected, List<T> result, CheckpointingMode semantic) {
+        checkNotNull(expected);
+        checkNotNull(result);
+        if (EXACTLY_ONCE.equals(semantic)) {
+            return expected.size() <= result.size();
+        } else if (AT_LEAST_ONCE.equals(semantic)) {
+            Set<Integer> matchedIndex = new HashSet<>();
+            for (T record : expected) {
+                int before = matchedIndex.size();
+                for (int i = 0; i < result.size(); i++) {
+                    if (matchedIndex.contains(i)) {
+                        continue;
+                    }
+                    if (record.equals(result.get(i))) {
+                        matchedIndex.add(i);
+                        break;
+                    }
+                }
+                // if not find the record in the result
+                if (before == matchedIndex.size()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        throw new IllegalStateException(
+                String.format("%s delivery guarantee doesn't support test.", semantic.name()));
+    }
+
+    /**
+     * Compare the test data with actual data in given semantic.
+     *
+     * @param reader the data reader for the sink
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     */
+    private void checkResultWithSemantic(
+            ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+            throws Exception {
+        final ArrayList<T> result = new ArrayList<>();
+        waitUntilCondition(
+                () -> {
+                    pollAndAppendResultData(result, reader, testData, 30, semantic);
+                    try {
+                        CollectIteratorAssertions.assertThat(sort(result).iterator())
+                                .matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
+                        return true;
+                    } catch (Throwable t) {
+                        return false;
+                    }
+                },
+                Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
+    }
+
+    /** Compare the metrics. */
+    private boolean compareSinkMetrics(
+            MetricQuerier metricQuerier,
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> context,
+            JobID jobId,
+            String sinkName,
+            long allRecordSize)
+            throws Exception {
+        double sumNumRecordsOut =
+                metricQuerier.getAggregatedMetricsByRestAPI(
+                        testEnv.getRestEndpoint(),
+                        jobId,
+                        sinkName,
+                        MetricNames.IO_NUM_RECORDS_OUT,
+                        getSinkMetricFilter(context));
+        return Precision.equals(allRecordSize, sumNumRecordsOut);
+    }
+
+    /** Sort the list. */
+    private List<T> sort(List<T> list) {
+        return list.stream().sorted().collect(Collectors.toList());
+    }
+
+    private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
+        return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
+    }
+
+    private void killJob(JobClient jobClient) throws Exception {
+        terminateJob(jobClient);
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
+    }
+
+    private DataStreamSink<T> tryCreateSink(
+            DataStream<T> dataStream,
+            DataStreamSinkExternalContext<T> context,
+            TestingSinkSettings sinkSettings) {
+        try {
+            if (context instanceof DataStreamSinkV1ExternalContext) {
+                org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sinkV1 =
+                        ((DataStreamSinkV1ExternalContext<T>) context).createSink(sinkSettings);
+                return dataStream.sinkTo(sinkV1);
+            } else if (context instanceof DataStreamSinkV2ExternalContext) {
+                Sink<T> sinkV2 =
+                        ((DataStreamSinkV2ExternalContext<T>) context).createSink(sinkSettings);
+                return dataStream.sinkTo(sinkV2);
+            } else {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.",
+                                context.getClass()));
+            }
+        } catch (UnsupportedOperationException e) {
+            // abort the test
+            throw new TestAbortedException("Cannot create a sink satisfying given options.", e);
+        }
+    }
+
+    /**
+     * Return the filter used to filter the sink metric.
+     *
+     * <ul>
+     *   <li>Sink v1: return null.
+     *   <li>Sink v2: return the "Writer" prefix in the `SinkTransformationTranslator`.
+     * </ul>
+     */
+    private String getSinkMetricFilter(DataStreamSinkExternalContext<T> context) {
+        if (context instanceof DataStreamSinkV1ExternalContext) {
+            return null;
+        } else if (context instanceof DataStreamSinkV2ExternalContext) {
+            // See class `SinkTransformationTranslator`
+            return "Writer";
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Get unexpected sink context: %s", context.getClass()));
+        }
+    }
+
+    protected CollectResultIterator<T> addCollectSink(DataStream<T> stream) {
+        TypeSerializer<T> serializer =
+                stream.getType().createSerializer(stream.getExecutionConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
+        CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
+        sink.name("Data stream collect sink");
+        stream.getExecutionEnvironment().addOperator(sink.getTransformation());
+        return new CollectResultIterator<>(
+                operator.getOperatorIdFuture(),
+                serializer,
+                accumulatorName,
+                stream.getExecutionEnvironment().getCheckpointConfig());
+    }
+
+    private void waitExpectedSizeData(CollectResultIterator<T> iterator, int targetNum) {
+        assertThat(
+                        CompletableFuture.supplyAsync(
+                                () -> {
+                                    int count = 0;
+                                    while (count < targetNum && iterator.hasNext()) {
+                                        iterator.next();
+                                        count++;
+                                    }
+                                    if (count < targetNum) {
+                                        throw new IllegalStateException(
+                                                String.format(
+                                                        "Fail to get %d records.", targetNum));
+                                    }
+                                    return true;
+                                }))
+                .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 992e12a..a90d05b 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.connector.testframe.utils.MetricQuerier;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -70,11 +71,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
 import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT;
+import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
@@ -447,11 +451,14 @@ public abstract class SourceTestSuiteBase<T> {
         final JobClient jobClient = env.executeAsync("Metrics Test");
 
         final MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
+        final ExecutorService executorService = Executors.newCachedThreadPool();
         try {
             waitForAllTaskRunning(
                     () ->
-                            queryRestClient.getJobDetails(
-                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                            getJobDetails(
+                                    new RestClient(new Configuration(), executorService),
+                                    testEnv.getRestEndpoint(),
+                                    jobClient.getJobID()),
                     Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT));
 
             waitUntilCondition(
@@ -472,6 +479,7 @@ public abstract class SourceTestSuiteBase<T> {
                     Deadline.fromNow(DEFAULT_COLLECT_DATA_TIMEOUT));
         } finally {
             // Clean up
+            executorService.shutdown();
             killJob(jobClient);
         }
     }
@@ -775,7 +783,8 @@ public abstract class SourceTestSuiteBase<T> {
                         testEnv.getRestEndpoint(),
                         jobId,
                         sourceName,
-                        MetricNames.IO_NUM_RECORDS_IN);
+                        MetricNames.IO_NUM_RECORDS_IN,
+                        null);
         return Precision.equals(allRecordSize, sumNumRecordsIn);
     }
 
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
index 9fe13c1..d749132 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQuerier.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
@@ -40,6 +41,9 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -56,16 +60,15 @@ public class MetricQuerier {
         restClient = new RestClient(configuration, Executors.newCachedThreadPool());
     }
 
-    public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, JobID jobId)
-            throws Exception {
+    public static JobDetailsInfo getJobDetails(
+            RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception {
         String jmAddress = endpoint.getAddress();
         int jmPort = endpoint.getPort();
 
         final JobMessageParameters params = new JobMessageParameters();
         params.jobPathParameter.resolve(jobId);
 
-        return restClient
-                .sendRequest(
+        return client.sendRequest(
                         jmAddress,
                         jmPort,
                         JobDetailsHeaders.getInstance(),
@@ -120,10 +123,11 @@ public class MetricQuerier {
             TestEnvironment.Endpoint endpoint,
             JobID jobId,
             String sourceOrSinkName,
-            String metricName)
+            String metricName,
+            String filter)
             throws Exception {
         // get job details, including the vertex id
-        JobDetailsInfo jobDetailsInfo = getJobDetails(endpoint, jobId);
+        JobDetailsInfo jobDetailsInfo = getJobDetails(restClient, endpoint, jobId);
 
         // get the vertex id for source/sink operator
         JobDetailsInfo.JobVertexDetailsInfo vertex =
@@ -143,8 +147,8 @@ public class MetricQuerier {
                 metricsResponseBody.getMetrics().stream()
                         .filter(
                                 m ->
-                                        m.getId().endsWith(metricName)
-                                                && m.getId().contains(sourceOrSinkName))
+                                        filterByMetricName(
+                                                m.getId(), sourceOrSinkName, metricName, filter))
                         .map(m -> m.getId())
                         .collect(Collectors.joining(","));
 
@@ -157,6 +161,27 @@ public class MetricQuerier {
 
         AggregatedMetricsResponseBody metricsResponse =
                 getMetrics(endpoint, jobId, vertexId, queryParam);
-        return metricsResponse.getMetrics().iterator().next().getSum();
+
+        Collection<AggregatedMetric> metrics = metricsResponse.getMetrics();
+        if (metrics == null || metrics.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot find metric[%s] for operator [%s] with filter [%s].",
+                            metricName, sourceOrSinkName, filter));
+        }
+        return metrics.iterator().next().getSum();
+    }
+
+    private boolean filterByMetricName(
+            String metricName,
+            String sourceOrSinkName,
+            String targetMetricName,
+            @Nullable String filter) {
+        boolean filterByName =
+                metricName.endsWith(targetMetricName) && metricName.contains(sourceOrSinkName);
+        if (!StringUtils.isNullOrWhitespaceOnly(filter)) {
+            return filterByName && metricName.contains(filter);
+        }
+        return filterByName;
     }
 }

[flink] 02/02: [hotfix][tests][connector/kafka] Disable KafkaSink metric tests until FLINK-26126 has been fixed

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3106f14c06c8ee619afc516cb627b83e5ec4ca74
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Tue Feb 15 20:43:30 2022 +0800

    [hotfix][tests][connector/kafka] Disable KafkaSink metric tests until FLINK-26126 has been fixed
---
 .../org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java   | 9 +++++++++
 .../java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java | 9 +++++++++
 2 files changed, 18 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 4234f7a..32210c6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -205,6 +205,15 @@ public class KafkaSinkITCase extends TestLogger {
         @TestContext
         KafkaSinkExternalContextFactory sinkContext =
                 new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
+
+        @Disabled("Skip metric test until FLINK-26126 fixed")
+        @TestTemplate
+        @Override
+        public void testMetrics(
+                TestEnvironment testEnv,
+                DataStreamSinkExternalContext<String> externalContext,
+                CheckpointingMode semantic)
+                throws Exception {}
     }
 
     @Test
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index 520491e..ff845f4 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -84,4 +84,13 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
                                     .toURL()));
 
     public KafkaSinkE2ECase() throws Exception {}
+
+    @Disabled("Skip metric test until FLINK-26126 fixed")
+    @TestTemplate
+    @Override
+    public void testMetrics(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<String> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {}
 }