You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/27 09:48:07 UTC

[GitHub] [flink] PatrickRen commented on a change in pull request #18496: [FLINK-25289][tests] add sink test suite in connector testframe

PatrickRen commented on a change in pull request #18496:
URL: https://github.com/apache/flink/pull/18496#discussion_r793233942



##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
##########
@@ -33,180 +36,194 @@
 
     // ----------------------------  Matcher Builders ----------------------------------
     public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
-            List<List<T>> testRecordsLists) {
-        return new MultipleSplitDataMatcher<>(testRecordsLists);
+            List<List<T>> testRecordsLists, CheckpointingMode semantic) {
+        return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> testData) {
-        return new SingleSplitDataMatcher<>(testData);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists,
+            CheckpointingMode semantic,
+            boolean testDataAllInResult) {
+        return new MultipleSplitDataMatcher<>(
+                testRecordsLists, MultipleSplitDataMatcher.UNSET, semantic, testDataAllInResult);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> testData, int limit) {
-        return new SingleSplitDataMatcher<>(testData, limit);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists, Integer limit, CheckpointingMode semantic) {
+        if (limit == null) {
+            return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
+        }
+        return new MultipleSplitDataMatcher<>(testRecordsLists, limit, semantic);
     }
 
     // ---------------------------- Matcher Definitions --------------------------------
-
     /**
-     * Matcher for validating test data in a single split.
+     * Matcher for validating test data from multiple splits.
+     *
+     * <p>Each list has a pointer (iterator) pointing to current checking record. When a record is
+     * received in the stream, it will be compared to all current pointing records in lists, and the
+     * pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all splits, all pointers
+     * should reach the end of the list finally.
      *
      * @param <T> Type of validating record
      */
-    public static class SingleSplitDataMatcher<T> extends TypeSafeDiagnosingMatcher<Iterator<T>> {
+    public static class MultipleSplitDataMatcher<T> extends Condition<Iterator<T>> {
+        private static final Logger LOG = LoggerFactory.getLogger(MultipleSplitDataMatcher.class);
+
         private static final int UNSET = -1;
 
-        private final List<T> testData;
-        private final int limit;
+        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
 
+        private List<List<T>> testData;
         private String mismatchDescription = null;
+        private final int limit;
+        private final int testDataSize;
+        private final CheckpointingMode semantic;
+        private final boolean testDataAllInResult;
 
-        public SingleSplitDataMatcher(List<T> testData) {
-            this.testData = testData;
-            this.limit = UNSET;
+        public MultipleSplitDataMatcher(List<List<T>> testData, CheckpointingMode semantic) {
+            this(testData, UNSET, semantic);
         }
 
-        public SingleSplitDataMatcher(List<T> testData, int limit) {
-            if (limit > testData.size()) {
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData, int limit, CheckpointingMode semantic) {
+            this(testData, limit, semantic, true);
+        }
+
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData,
+                int limit,
+                CheckpointingMode semantic,
+                boolean testDataAllInResult) {
+            super();
+            int allSize = 0;
+            for (List<T> testRecordsList : testData) {
+                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
+                allSize += testRecordsList.size();
+            }
+
+            if (limit > allSize) {
                 throw new IllegalArgumentException(
                         "Limit validation size should be less than number of test records");
             }
+            this.testDataAllInResult = testDataAllInResult;
             this.testData = testData;
+            this.semantic = semantic;
+            this.testDataSize = allSize;
             this.limit = limit;
         }
 
         @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
+        public boolean matches(Iterator<T> resultIterator) {
+            if (CheckpointingMode.AT_LEAST_ONCE.equals(semantic)) {
+                return matchAtLeastOnce(resultIterator);
             }
+            return matchExactlyOnce(resultIterator);
+        }
 
-            boolean dataMismatch = false;
-            boolean sizeMismatch = false;
-            String sizeMismatchDescription = "";
-            String dataMismatchDescription = "";
+        protected boolean matchExactlyOnce(Iterator<T> resultIterator) {
             int recordCounter = 0;
-            for (T testRecord : testData) {
-                if (!resultIterator.hasNext()) {
-                    sizeMismatchDescription =
-                            String.format(
-                                    "Expected to have %d records in result, but only received %d records",
-                                    limit == UNSET ? testData.size() : limit, recordCounter);
-                    sizeMismatch = true;
-                    break;
-                }
-                T resultRecord = resultIterator.next();
-                if (!testRecord.equals(resultRecord)) {
-                    dataMismatchDescription =
-                            String.format(
-                                    "Mismatched record at position %d: Expected '%s' but was '%s'",
-                                    recordCounter, testRecord, resultRecord);
-                    dataMismatch = true;
+            while (resultIterator.hasNext()) {
+                final T record = resultIterator.next();
+                if (!matchThenNext(record)) {
+                    if (recordCounter >= testDataSize) {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Expected to have exactly %d records in result, but received more records",
+                                                testRecordsLists.stream()
+                                                        .mapToInt(list -> list.records.size())
+                                                        .sum()),
+                                        resultIterator);
+                    } else {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Unexpected record '%s' at position %d",
+                                                record, recordCounter),
+                                        resultIterator);
+                    }
+                    logError();

Review comment:
       Do we need to write error message to log? I think the error will be reflected in `mismatchDescription` and the exception thrown

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
##########
@@ -86,6 +93,8 @@
 public abstract class SourceTestSuiteBase<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SourceTestSuiteBase.class);
+    static ExecutorService executorService =

Review comment:
       Could we move this `ExecutorService` to a util class? Having this executor as a static member of test suite is kinda weird 🤔 

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/MockEnumerator.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.connector.testframe.source.split.ListSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Mock enumerator. */
+public class MockEnumerator implements SplitEnumerator<ListSplit, MockEnumState> {

Review comment:
       What about `NoOpEnumerator`?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);

Review comment:
       Could we wrap this `sort()` into helper function `checkResult()`?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+        CollectResultIterator<T> iterator = addCollectSink(source);

Review comment:
       It'll be more descriptive to have some comments here explaining why we need collect sink here

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        String savepointDir;
+        try {
+            final MetricQueryer queryRestClient =

Review comment:
       Is there any specific reason to use a metric related util class in a savepoint case?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestUtils.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test utils. */
+public class TestUtils {

Review comment:
       It's better to have JavaDocs for these util methods

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */
+public class MetricQueryer {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryer.class);
+    private RestClient restClient;
+
+    public MetricQueryer(Configuration configuration, Executor executor)
+            throws ConfigurationException {
+        restClient = new RestClient(configuration, executor);
+    }
+
+    public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, JobID jobId)

Review comment:
       Why **metric** queryer has job detail related logic?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(

Review comment:
       The name of test is quite ambiguous. What about `testBasicSink`?
   
   Also I think we can make this case as simple as possible. What about disabling checkpoint and set sink's parallelism to 1?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
##########
@@ -400,6 +414,36 @@ protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) th
                 stream.getExecutionEnvironment().getCheckpointConfig());
     }
 
+    /**
+     * Compare the test data with the result.
+     *
+     * <p>If the source is bounded, limit should be null.
+     *
+     * @param resultIterator the data read from the job
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     * @param limit expected number of the data to read from the job
+     */
+    private void checkResultBySemantic(
+            CloseableIterator<T> resultIterator,
+            List<List<T>> testData,
+            CheckpointingMode semantic,
+            Integer limit) {
+        if (limit != null) {
+            timeoutAssert(

Review comment:
       What about using `Assertions.assertThat(CompletableFuture).suceedsWithin(Duration)`? But this requires to change utils in `TestDataMatcher` using future-style

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/ListSource.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.testframe.source.enumerator.MockEnumState;
+import org.apache.flink.connector.testframe.source.enumerator.MockEnumStateSerializer;
+import org.apache.flink.connector.testframe.source.enumerator.MockEnumerator;
+import org.apache.flink.connector.testframe.source.split.ListSplit;
+import org.apache.flink.connector.testframe.source.split.ListSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.List;
+
+/**
+ * The source reads data from a list and stops reading at the fixed position. The source will wait
+ * until the checkpoint or savepoint triggers.
+ *
+ * <p>Note that this source must be of parallelism 1.
+ */
+public class ListSource<OUT> implements Source<OUT, ListSplit, MockEnumState> {

Review comment:
       What about using `FromElementsSource` to align with `FromElementsFunction`? And we can create another PR for replacing `FromElementsFunction` with Source API in the future.

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {

Review comment:
       Is there any reason that T should be `Comparable`? 

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;

Review comment:
       Use `Duration` and define as `static`

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestUtils.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test utils. */
+public class TestUtils {
+    public static File newFolder(Path path) throws IOException {
+        Path tempPath = Files.createTempDirectory(path, "testing-framework", new FileAttribute[0]);
+        return tempPath.toFile();
+    }
+
+    public static <T> List<T> appendResultData(
+            List<T> result,
+            ExternalSystemDataReader<T> reader,
+            List<T> expected,
+            int retryTimes,
+            CheckpointingMode semantic) {
+        long timeoutMs = 1000L;
+        int retryIndex = 0;
+        if (EXACTLY_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && result.size() < expected.size()) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        } else if (AT_LEAST_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && !containSameVal(expected, result, semantic)) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        }
+        throw new IllegalStateException(
+                String.format("%s delivery guarantee doesn't support test.", semantic.name()));
+    }
+
+    public static <T> boolean containSameVal(
+            List<T> expected, List<T> result, CheckpointingMode semantic) {
+        checkNotNull(expected);
+        checkNotNull(result);
+
+        Set<Integer> matchedIndex = new HashSet<>();
+        if (EXACTLY_ONCE.equals(semantic) && expected.size() != result.size()) {
+            return false;
+        }
+        for (T rowData0 : expected) {
+            int before = matchedIndex.size();
+            for (int i = 0; i < result.size(); i++) {
+                if (matchedIndex.contains(i)) {
+                    continue;
+                }
+                if (rowData0.equals(result.get(i))) {
+                    matchedIndex.add(i);
+                    break;
+                }
+            }
+            if (before == matchedIndex.size()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static void timeoutAssert(
+            ExecutorService executorService, Runnable task, long time, TimeUnit timeUnit) {
+        Future future = executorService.submit(task);
+        try {
+            future.get(time, timeUnit);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Test failed to get the result.", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Test failed with some exception.", e);
+        } catch (TimeoutException e) {
+            throw new RuntimeException(
+                    String.format("Test timeout after %d %s.", time, timeUnit.name()), e);
+        } finally {
+            future.cancel(true);
+        }
+    }
+
+    public static void deletePath(Path path) throws IOException {

Review comment:
       Apache Commons has a helper function:
   
   ```java
   import org.apache.commons.io.FileUtils;
   FileUtils.deleteDirectory(File);
   ```

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        String savepointDir;
+        try {
+            final MetricQueryer queryRestClient =
+                    new MetricQueryer(new Configuration(), executorService);
+            waitForAllTaskRunning(
+                    () ->
+                            queryRestClient.getJobDetails(
+                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+
+            timeoutAssert(
+                    executorService,
+                    () -> {
+                        int count = 0;
+                        while (count < numBeforeSuccess && iterator.hasNext()) {
+                            iterator.next();
+                            count++;
+                        }
+                        if (count < numBeforeSuccess) {
+                            throw new IllegalStateException(
+                                    String.format("Fail to get %d records.", numBeforeSuccess));
+                        }
+                    },
+                    30,
+                    TimeUnit.SECONDS);
+            savepointDir =
+                    jobClient
+                            .stopWithSavepoint(true, testEnv.getCheckpointUri())
+                            .get(30, TimeUnit.SECONDS);
+            waitForJobStatus(
+                    jobClient,
+                    Collections.singletonList(JobStatus.FINISHED),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+        } catch (Exception e) {
+            killJob(jobClient);
+            throw e;
+        }
+
+        List<T> target = sort(testRecords.subList(0, numBeforeSuccess));

Review comment:
       We can wrap `sort` into `checkResult`

##########
File path: flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
##########
@@ -224,6 +223,15 @@ under the License.
 							<type>jar</type>
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
+						<artifactItem>

Review comment:
       We can remove this snippet if we move `ListSource` (`FromElementsSource`) to `flink-streaming-java` 

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestUtils.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test utils. */
+public class TestUtils {
+    public static File newFolder(Path path) throws IOException {
+        Path tempPath = Files.createTempDirectory(path, "testing-framework", new FileAttribute[0]);
+        return tempPath.toFile();
+    }
+
+    public static <T> List<T> appendResultData(
+            List<T> result,
+            ExternalSystemDataReader<T> reader,
+            List<T> expected,
+            int retryTimes,
+            CheckpointingMode semantic) {
+        long timeoutMs = 1000L;
+        int retryIndex = 0;
+        if (EXACTLY_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && result.size() < expected.size()) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        } else if (AT_LEAST_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && !containSameVal(expected, result, semantic)) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        }
+        throw new IllegalStateException(
+                String.format("%s delivery guarantee doesn't support test.", semantic.name()));
+    }
+
+    public static <T> boolean containSameVal(
+            List<T> expected, List<T> result, CheckpointingMode semantic) {
+        checkNotNull(expected);
+        checkNotNull(result);
+
+        Set<Integer> matchedIndex = new HashSet<>();
+        if (EXACTLY_ONCE.equals(semantic) && expected.size() != result.size()) {
+            return false;
+        }
+        for (T rowData0 : expected) {
+            int before = matchedIndex.size();
+            for (int i = 0; i < result.size(); i++) {
+                if (matchedIndex.contains(i)) {
+                    continue;
+                }
+                if (rowData0.equals(result.get(i))) {
+                    matchedIndex.add(i);
+                    break;
+                }
+            }
+            if (before == matchedIndex.size()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static void timeoutAssert(
+            ExecutorService executorService, Runnable task, long time, TimeUnit timeUnit) {
+        Future future = executorService.submit(task);
+        try {
+            future.get(time, timeUnit);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Test failed to get the result.", e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Test failed with some exception.", e);
+        } catch (TimeoutException e) {
+            throw new RuntimeException(
+                    String.format("Test timeout after %d %s.", time, timeUnit.name()), e);
+        } finally {
+            future.cancel(true);
+        }
+    }
+
+    public static void deletePath(Path path) throws IOException {
+        List<File> files =
+                Files.walk(path)
+                        .filter(p -> p != path)
+                        .map(Path::toFile)
+                        .collect(Collectors.toList());
+        for (File file : files) {
+            if (file.isDirectory()) {
+                deletePath(file.toPath());
+            } else {
+                file.delete();
+            }
+        }
+        Files.deleteIfExists(path);
+    }
+
+    public static boolean doubleEquals(double d0, double d1) {

Review comment:
       Also in Apache Commons:
   
   ```java
   import org.apache.commons.math3.util.Precision;
   Precision.equals(d0, d1);
   ```

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        String savepointDir;
+        try {
+            final MetricQueryer queryRestClient =
+                    new MetricQueryer(new Configuration(), executorService);
+            waitForAllTaskRunning(
+                    () ->
+                            queryRestClient.getJobDetails(
+                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+
+            timeoutAssert(
+                    executorService,
+                    () -> {
+                        int count = 0;
+                        while (count < numBeforeSuccess && iterator.hasNext()) {

Review comment:
       What about using a helper function for readability?

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connector.testframe.source.ListSource;
+import org.apache.flink.connector.testframe.utils.MetricQueryer;
+import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.testframe.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.appendResultData;
+import static org.apache.flink.connector.testframe.utils.TestUtils.doubleEquals;
+import static org.apache.flink.connector.testframe.utils.TestUtils.timeoutAssert;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Base class for sink test suite.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class);
+    static ExecutorService executorService =
+            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+
+    private final long jobExecuteTimeMs = 20000;
+
+    // ----------------------------- Basic test cases ---------------------------------
+
+    /**
+     * Test connector data stream sink.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write them to this sink by the Flink Job.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test data stream sink")
+    public void testSink(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.enableCheckpointing(50);
+        execEnv.fromCollection(testRecords)
+                .name("sourceInSinkTest")
+                .setParallelism(1)
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("sinkInSinkTest");
+        final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test");
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        // Check test result
+        List<T> target = sort(testRecords);
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with the same parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint. After the job has been
+     * running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting from a savepoint")
+    public void testStartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 2);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a higher parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a higher parallelism 4.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a higher parallelism")
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 2, 4);
+    }
+
+    /**
+     * Test connector source restart from a completed savepoint with a lower parallelism.
+     *
+     * <p>This test will create a sink in the external system, generate a collection of test data
+     * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then
+     * stop the job, restart the same job from the completed savepoint with a lower parallelism 2.
+     * After the job has been running, write the other part to the sink and compare the result.
+     *
+     * <p>In order to pass this test, the number of records produced by Flink need to be equals to
+     * the generated test data. And the records in the sink will be compared to the test data by the
+     * different semantic. There's no requirement for record order.
+     */
+    @TestTemplate
+    @DisplayName("Test sink restarting with a lower parallelism")
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        restartFromSavepoint(testEnv, externalContext, semantic, 4, 2);
+    }
+
+    private void restartFromSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic,
+            final int beforeParallelism,
+            final int afterParallelism)
+            throws Exception {
+        // Step 1: Preparation
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        final StreamExecutionEnvironment execEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        execEnv.setRestartStrategy(RestartStrategies.noRestart());
+
+        // Step 2: Generate test data
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // Step 3: Build and execute Flink job
+        int numBeforeSuccess = testRecords.size() / 2;
+        DataStreamSource<T> source =
+                execEnv.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        numBeforeSuccess),
+                                WatermarkStrategy.noWatermarks(),
+                                "beforeRestartSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name("Sink restart test")
+                .setParallelism(beforeParallelism);
+        CollectResultIterator<T> iterator = addCollectSink(source);
+        final JobClient jobClient = execEnv.executeAsync("Restart Test");
+        iterator.setJobClient(jobClient);
+
+        // Step 4: Wait for the expected result and stop Flink job with a savepoint
+        String savepointDir;
+        try {
+            final MetricQueryer queryRestClient =
+                    new MetricQueryer(new Configuration(), executorService);
+            waitForAllTaskRunning(
+                    () ->
+                            queryRestClient.getJobDetails(
+                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+
+            timeoutAssert(
+                    executorService,
+                    () -> {
+                        int count = 0;
+                        while (count < numBeforeSuccess && iterator.hasNext()) {
+                            iterator.next();
+                            count++;
+                        }
+                        if (count < numBeforeSuccess) {
+                            throw new IllegalStateException(
+                                    String.format("Fail to get %d records.", numBeforeSuccess));
+                        }
+                    },
+                    30,
+                    TimeUnit.SECONDS);
+            savepointDir =
+                    jobClient
+                            .stopWithSavepoint(true, testEnv.getCheckpointUri())
+                            .get(30, TimeUnit.SECONDS);
+            waitForJobStatus(
+                    jobClient,
+                    Collections.singletonList(JobStatus.FINISHED),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+        } catch (Exception e) {
+            killJob(jobClient);
+            throw e;
+        }
+
+        List<T> target = sort(testRecords.subList(0, numBeforeSuccess));
+        checkResult(externalContext.createSinkDataReader(sinkSettings), target, semantic, false);
+
+        // Step 4: restart the Flink job with the savepoint
+        final StreamExecutionEnvironment restartEnv =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .setSavepointRestorePath(savepointDir)
+                                .build());
+        restartEnv.enableCheckpointing(50);
+
+        DataStreamSource<T> restartSource =
+                restartEnv
+                        .fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "restartSource")
+                        .setParallelism(1);
+
+        restartSource
+                .returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .setParallelism(afterParallelism);
+        addCollectSink(restartSource);
+        final JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
+
+        try {
+            // Check the result
+            checkResult(
+                    externalContext.createSinkDataReader(sinkSettings),
+                    sort(testRecords),
+                    semantic);
+        } finally {
+            killJob(restartJobClient);
+            iterator.close();
+        }
+    }
+
+    /**
+     * Test connector sink metrics.
+     *
+     * <p>This test will create a sink in the external system, generate test data and write them to
+     * the sink via a Flink job. Then read and compare the metrics.
+     *
+     * <p>Now test: numRecordsOut
+     */
+    @TestTemplate
+    @DisplayName("Test sink metrics")
+    public void testMetrics(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<T> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic);
+        int parallelism = 2;
+        final List<T> testRecords = generateTestData(sinkSettings, externalContext);
+
+        // make sure use different names when executes multi times
+        String sinkName = "metricTestSink" + testRecords.hashCode();
+        final StreamExecutionEnvironment env =
+                testEnv.createExecutionEnvironment(
+                        TestEnvironmentSettings.builder()
+                                .setConnectorJarPaths(externalContext.getConnectorJarPaths())
+                                .build());
+        env.enableCheckpointing(50);
+
+        DataStreamSource<T> source =
+                env.fromSource(
+                                new ListSource<>(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        testRecords,
+                                        testRecords.size()),
+                                WatermarkStrategy.noWatermarks(),
+                                "metricTestSource")
+                        .setParallelism(1);
+
+        source.returns(externalContext.getProducedType())
+                .sinkTo(tryCreateSink(externalContext, sinkSettings))
+                .name(sinkName)
+                .setParallelism(parallelism);
+        final JobClient jobClient = env.executeAsync("Metrics Test");
+        final MetricQueryer queryRestClient =
+                new MetricQueryer(new Configuration(), executorService);
+        try {
+            waitForAllTaskRunning(
+                    () ->
+                            queryRestClient.getJobDetails(
+                                    testEnv.getRestEndpoint(), jobClient.getJobID()),
+                    Deadline.fromNow(Duration.ofSeconds(30)));
+
+            waitUntilCondition(
+                    () -> {
+                        // test metrics
+                        try {
+                            return compareSinkMetrics(
+                                    queryRestClient,
+                                    testEnv,
+                                    jobClient.getJobID(),
+                                    sinkName,
+                                    testRecords.size());
+                        } catch (Exception e) {
+                            // skip failed assert try
+                            return false;
+                        }
+                    },
+                    Deadline.fromNow(Duration.ofMillis(jobExecuteTimeMs)));
+        } finally {
+            // Clean up
+            killJob(jobClient);
+        }
+    }
+
+    // ----------------------------- Helper Functions ---------------------------------
+
+    /**
+     * Generate a set of test records.
+     *
+     * @param testingSinkSettings sink settings
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected List<T> generateTestData(
+            TestingSinkSettings testingSinkSettings,
+            DataStreamSinkExternalContext<T> externalContext) {
+        return externalContext.generateTestData(
+                testingSinkSettings, ThreadLocalRandom.current().nextLong());
+    }
+
+    /**
+     * Compare the test data with the result.
+     *
+     * @param reader the data reader for the sink
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     */
+    private void checkResult(
+            ExternalSystemDataReader<T> reader, List<T> testData, CheckpointingMode semantic)
+            throws Exception {
+        checkResult(reader, testData, semantic, true);
+    }
+
+    /**
+     * Compare the test data with the result.
+     *
+     * @param reader the data reader for the sink
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     * @param testDataAllInResult whether the result contains all the test data
+     */
+    private void checkResult(
+            ExternalSystemDataReader<T> reader,
+            List<T> testData,
+            CheckpointingMode semantic,
+            boolean testDataAllInResult)
+            throws Exception {
+        final ArrayList<T> result = new ArrayList<>();
+        final TestDataMatchers.MultipleSplitDataMatcher<T> matcher =
+                matchesMultipleSplitTestData(
+                        Arrays.asList(testData), semantic, testDataAllInResult);
+        waitUntilCondition(
+                () -> {
+                    appendResultData(result, reader, testData, 30, semantic);
+                    return matcher.matches(sort(result).iterator());
+                },
+                Deadline.fromNow(Duration.ofMillis(jobExecuteTimeMs)));
+    }
+
+    /** Compare the metrics. */
+    private boolean compareSinkMetrics(
+            MetricQueryer metricQueryer,
+            TestEnvironment testEnv,
+            JobID jobId,
+            String sinkName,
+            long allRecordSize)
+            throws Exception {
+        double sumNumRecordsOut =
+                metricQueryer.getMetricByRestApi(
+                        testEnv.getRestEndpoint(), jobId, sinkName, MetricNames.IO_NUM_RECORDS_OUT);
+        return doubleEquals(allRecordSize, sumNumRecordsOut);
+    }
+
+    /** Sort the list. */
+    private List<T> sort(List<T> list) {
+        return list.stream().sorted().collect(Collectors.toList());
+    }
+
+    private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) {
+        return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build();
+    }
+
+    private void killJob(JobClient jobClient) throws Exception {
+        terminateJob(jobClient, Duration.ofSeconds(30));
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+    }
+
+    private Sink<T, ?, ?, ?> tryCreateSink(
+            DataStreamSinkExternalContext<T> context, TestingSinkSettings sinkSettings) {
+        try {
+            return context.createSink(sinkSettings);
+        } catch (UnsupportedOperationException e) {
+            // abort the test
+            throw new TestAbortedException("Not support this test.", e);

Review comment:
       "Cannot create a sink satisfying given options"

##########
File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */
+public class MetricQueryer {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryer.class);
+    private RestClient restClient;
+
+    public MetricQueryer(Configuration configuration, Executor executor)

Review comment:
       Maybe `MetricQueryer` could construct its own and private executor instead of using an outside one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org