You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/24 10:16:43 UTC

[GitHub] [flink] fsk119 commented on a change in pull request #17537: [FLINK-19722][table-planner]Pushdown Watermark to SourceProvider (new Source)

fsk119 commented on a change in pull request #17537:
URL: https://github.com/apache/flink/pull/17537#discussion_r735066347



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
##########
@@ -97,9 +104,40 @@ public DynamicTableSourceSpec getTableSourceSpec() {
             return createInputFormatTransformation(env, inputFormat, outputTypeInfo, operatorName);
         } else if (provider instanceof SourceProvider) {
             Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
-            // TODO: Push down watermark strategy to source scan
-            return env.fromSource(
-                            source, WatermarkStrategy.noWatermarks(), operatorName, outputTypeInfo)
+            // don't use rowTypes from CatalogTable
+            // because the rowType number may be reduced by ProjectionPushDown
+            RowType sourceRowType = outputTypeInfo.toRowType();
+            SourceAbilityContext sourceAbilityContext =
+                    new SourceAbilityContext(planner.getFlinkContext(), sourceRowType);
+
+            WatermarkStrategy<RowData> watermarkStrategy = null;
+
+            if (tableSource instanceof SupportsWatermarkPushDown) {
+                for (SourceAbilitySpec sourceAbilitySpec :
+                        tableSourceSpec.getSourceAbilitySpecs()) {

Review comment:
       Use `Preconditions.checkNotNull` to slience the compile warnings.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;

Review comment:
       mark as final ?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;
+    private final String tableName;
+    private final boolean isInsertOnly;
+    private final String runtimeSink;
+    private final int expectedNum;
+    private final Map<String, DataType> writableMetadata;
+    private final Integer parallelism;
+    private final ChangelogMode changelogModeEnforced;
+    private final int rowtimeIndex;
+
+    public TestValuesTableSink(
+            DataType consumedDataType,
+            int[] primaryKeyIndices,
+            String tableName,
+            boolean isInsertOnly,
+            String runtimeSink,
+            int expectedNum,
+            Map<String, DataType> writableMetadata,
+            @Nullable Integer parallelism,
+            @Nullable ChangelogMode changelogModeEnforced,
+            int rowtimeIndex) {
+        this.consumedDataType = consumedDataType;
+        this.primaryKeyIndices = primaryKeyIndices;
+        this.tableName = tableName;
+        this.isInsertOnly = isInsertOnly;
+        this.runtimeSink = runtimeSink;
+        this.expectedNum = expectedNum;
+        this.writableMetadata = writableMetadata;
+        this.parallelism = parallelism;
+        this.changelogModeEnforced = changelogModeEnforced;
+        this.rowtimeIndex = rowtimeIndex;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // if param [changelogModeEnforced] is passed in, return it directly
+        if (changelogModeEnforced != null) {
+            return changelogModeEnforced;
+        }
+        if (isInsertOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            if (primaryKeyIndices.length > 0) {
+                // can update on key, ignore UPDATE_BEFORE
+                return ChangelogMode.upsert();
+            } else {
+                // don't have key, works in retract mode
+                return requestedMode;
+            }
+        }
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataStructureConverter converter = context.createDataStructureConverter(consumedDataType);
+        final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
+        final Boolean isEnforcedInsertOnly =
+                Optional.ofNullable(changelogModeEnforced)
+                        .map(changelogMode -> changelogMode.equals(ChangelogMode.insertOnly()))
+                        .orElse(false);
+        final Boolean isInsertOnly = isEnforcedInsertOnly || this.isInsertOnly;
+        if (isInsertOnly) {
+            Preconditions.checkArgument(
+                    expectedNum == -1,
+                    "Appending Sink doesn't support '"
+                            + TestValuesTableFactory.SINK_EXPECTED_MESSAGES_NUM.key()
+                            + "' yet.");
+            switch (runtimeSink) {
+                case "SinkFunction":
+                    return new SinkFunctionProvider() {
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+
+                        @Override
+                        public SinkFunction<RowData> createSinkFunction() {
+                            return new AppendingSinkFunction(tableName, converter, rowtimeIndex);
+                        }
+                    };
+                case "OutputFormat":
+                    return new OutputFormatProvider() {
+                        @Override
+                        public OutputFormat<RowData> createOutputFormat() {
+                            return new AppendingOutputFormat(tableName, converter);
+                        }
+
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }

Review comment:
       Use `OutputFormatProvider#of`.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesFactoryUtils.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Utils for {@link TestValuesTableFactory}. */
+public class TestValuesFactoryUtils {
+    public static int validateAndExtractRowtimeIndex(
+            CatalogTable sinkTable, boolean dropLateEvent, boolean isInsertOnly) {
+        if (!dropLateEvent) {
+            return -1;
+        } else if (!isInsertOnly) {

Review comment:
       I think it may also have rowtime if the sink doesn't drop late event...

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala
##########
@@ -41,8 +42,155 @@ class SourceWatermarkITCase extends StreamingTestBase {
   @Rule
   def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
 
+  val valuesSourceData = Seq(
+    row(1, 1L, LocalDateTime.parse("2020-11-21T19:00:05.23")),
+    row(2, 2L, LocalDateTime.parse("2020-11-21T19:00:10.23")),
+    row(3, 3L, LocalDateTime.parse("2020-11-21T19:00:15.23")),
+    row(4, 4L, LocalDateTime.parse("2020-11-21T19:00:20.23"))
+  )
+
+  @Test
+  def testWatermarkPushDownInValuesSource(): Unit = {
+
+    val dataId = TestValuesTableFactory.registerData(valuesSourceData)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE ValuesSourceTable (
+         |  a INT,
+         |  b BIGINT,
+         |  c TIMESTAMP(3),
+         |  d as c - INTERVAL '5' second,
+         |  WATERMARK FOR d as d + INTERVAL '5' second) WITH(
+         |  'connector' = 'values',
+         |  'enable-watermark-push-down' = 'true',
+         |  'data-id' = '$dataId',
+         |  'enable-lookup' = 'false',
+         |  'runtime-source' = 'Source'
+         |)
+         |""".stripMargin)
+
+    // avoid to generate too many watermarks with timestamp Long.MinValue
+    tEnv.getConfig.getConfiguration
+      .set(CoreOptions.DEFAULT_PARALLELISM.asInstanceOf[ConfigOption[Any]], 1)
+
+    tEnv.executeSql(
+      s"""
+         | CREATE Table CollectingWatermarkSinkTable (
+         |   a INT,
+         |   b BIGINT,
+         |   c TIMESTAMP(3)
+         | ) with (
+         | 'connector' = 'values',
+         | 'sink-insert-only' = 'false',
+         | 'runtime-sink' = 'SinkWithCollectingWatermark'
+         | )
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        | INSERT INTO CollectingWatermarkSinkTable
+        | SELECT a, b, c FROM ValuesSourceTable
+        |""".stripMargin).await()
+
+    // the first watermark timestamp is always Long.MinValue
+    val expectedWatermarkOutput = Seq(
+      "+1705471-09-26T16:47:04.192",
+      "2020-11-21T19:00:05.230",
+      "2020-11-21T19:00:10.230",
+      "2020-11-21T19:00:15.230"
+    )
+    val expectedData = List(
+      "1,1,2020-11-21T19:00:05.230",
+      "2,2,2020-11-21T19:00:10.230",
+      "3,3,2020-11-21T19:00:15.230",
+      "4,4,2020-11-21T19:00:20.230"
+    )
+
+    val sinkName = "CollectingWatermarkSinkTable"
+
+    val result = TestValuesTableFactory.getResults(sinkName).asScala.toList
+
+    val actualWatermark = TestValuesTableFactory.getWatermarkOutput(sinkName)

Review comment:
       I am not sure whether this a good idea to collect the actual watermark in the sink. Because we may not receives the expected watermark because the watermark generator is triggered by the time service periodically. 
   
   After digin, I find we can emit watermark per records by reimpelement the reader in the source. Here is my POC code.
   
   ```
   private static class TestValuesReader
               extends IteratorSourceReader<RowData, TestValuesIterator, TestValuesSplit> {
   
           public TestValuesReader(SourceReaderContext context) {
               super(context);
           }
   
           @Override
           public InputStatus pollNext(ReaderOutput<RowData> output) {
               InputStatus status = super.pollNext(output);
               if (status.equals(InputStatus.MORE_AVAILABLE)) {
                   ((SourceOutputWithWatermarks) output).emitPeriodicWatermark();
               }
               return status;
           }
       }
   ```

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;
+    private final String tableName;
+    private final boolean isInsertOnly;
+    private final String runtimeSink;
+    private final int expectedNum;
+    private final Map<String, DataType> writableMetadata;
+    private final Integer parallelism;
+    private final ChangelogMode changelogModeEnforced;
+    private final int rowtimeIndex;
+
+    public TestValuesTableSink(
+            DataType consumedDataType,
+            int[] primaryKeyIndices,
+            String tableName,
+            boolean isInsertOnly,
+            String runtimeSink,
+            int expectedNum,
+            Map<String, DataType> writableMetadata,
+            @Nullable Integer parallelism,
+            @Nullable ChangelogMode changelogModeEnforced,
+            int rowtimeIndex) {
+        this.consumedDataType = consumedDataType;
+        this.primaryKeyIndices = primaryKeyIndices;
+        this.tableName = tableName;
+        this.isInsertOnly = isInsertOnly;
+        this.runtimeSink = runtimeSink;
+        this.expectedNum = expectedNum;
+        this.writableMetadata = writableMetadata;
+        this.parallelism = parallelism;
+        this.changelogModeEnforced = changelogModeEnforced;
+        this.rowtimeIndex = rowtimeIndex;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // if param [changelogModeEnforced] is passed in, return it directly
+        if (changelogModeEnforced != null) {
+            return changelogModeEnforced;
+        }
+        if (isInsertOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            if (primaryKeyIndices.length > 0) {
+                // can update on key, ignore UPDATE_BEFORE
+                return ChangelogMode.upsert();
+            } else {
+                // don't have key, works in retract mode
+                return requestedMode;
+            }
+        }
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataStructureConverter converter = context.createDataStructureConverter(consumedDataType);
+        final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
+        final Boolean isEnforcedInsertOnly =
+                Optional.ofNullable(changelogModeEnforced)
+                        .map(changelogMode -> changelogMode.equals(ChangelogMode.insertOnly()))
+                        .orElse(false);
+        final Boolean isInsertOnly = isEnforcedInsertOnly || this.isInsertOnly;
+        if (isInsertOnly) {
+            Preconditions.checkArgument(
+                    expectedNum == -1,
+                    "Appending Sink doesn't support '"
+                            + TestValuesTableFactory.SINK_EXPECTED_MESSAGES_NUM.key()
+                            + "' yet.");
+            switch (runtimeSink) {
+                case "SinkFunction":
+                    return new SinkFunctionProvider() {
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+
+                        @Override
+                        public SinkFunction<RowData> createSinkFunction() {
+                            return new AppendingSinkFunction(tableName, converter, rowtimeIndex);
+                        }
+                    };
+                case "OutputFormat":
+                    return new OutputFormatProvider() {
+                        @Override
+                        public OutputFormat<RowData> createOutputFormat() {
+                            return new AppendingOutputFormat(tableName, converter);
+                        }
+
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+                    };
+                case "DataStream":
+                    return new DataStreamSinkProvider() {
+                        @Override
+                        public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
+                            return dataStream.addSink(
+                                    new AppendingSinkFunction(tableName, converter, rowtimeIndex));
+                        }
+
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+                    };
+
+                default:
+                    throw new IllegalArgumentException(
+                            "Unsupported runtime sink class: " + runtimeSink);
+            }
+        } else {
+            SinkFunction<RowData> sinkFunction;
+            if ("SinkWithCollectingWatermark".equals(runtimeSink)) {
+                sinkFunction = new CollectionWatermarkSinkFunction(tableName, converter);
+            } else {
+                // we don't support OutputFormat for updating query in the TestValues connector
+                assert runtimeSink.equals("SinkFunction");

Review comment:
       nit: It's better to use `Preconditions` rather than `assert`. Because assertions are disabled by default.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;
+    private final String tableName;
+    private final boolean isInsertOnly;
+    private final String runtimeSink;
+    private final int expectedNum;
+    private final Map<String, DataType> writableMetadata;
+    private final Integer parallelism;
+    private final ChangelogMode changelogModeEnforced;
+    private final int rowtimeIndex;
+
+    public TestValuesTableSink(
+            DataType consumedDataType,
+            int[] primaryKeyIndices,
+            String tableName,
+            boolean isInsertOnly,
+            String runtimeSink,
+            int expectedNum,
+            Map<String, DataType> writableMetadata,
+            @Nullable Integer parallelism,
+            @Nullable ChangelogMode changelogModeEnforced,
+            int rowtimeIndex) {
+        this.consumedDataType = consumedDataType;
+        this.primaryKeyIndices = primaryKeyIndices;
+        this.tableName = tableName;
+        this.isInsertOnly = isInsertOnly;
+        this.runtimeSink = runtimeSink;
+        this.expectedNum = expectedNum;
+        this.writableMetadata = writableMetadata;
+        this.parallelism = parallelism;
+        this.changelogModeEnforced = changelogModeEnforced;
+        this.rowtimeIndex = rowtimeIndex;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // if param [changelogModeEnforced] is passed in, return it directly
+        if (changelogModeEnforced != null) {
+            return changelogModeEnforced;
+        }
+        if (isInsertOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            if (primaryKeyIndices.length > 0) {
+                // can update on key, ignore UPDATE_BEFORE
+                return ChangelogMode.upsert();
+            } else {
+                // don't have key, works in retract mode
+                return requestedMode;
+            }
+        }
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataStructureConverter converter = context.createDataStructureConverter(consumedDataType);
+        final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
+        final Boolean isEnforcedInsertOnly =
+                Optional.ofNullable(changelogModeEnforced)
+                        .map(changelogMode -> changelogMode.equals(ChangelogMode.insertOnly()))
+                        .orElse(false);
+        final Boolean isInsertOnly = isEnforcedInsertOnly || this.isInsertOnly;
+        if (isInsertOnly) {
+            Preconditions.checkArgument(
+                    expectedNum == -1,
+                    "Appending Sink doesn't support '"
+                            + TestValuesTableFactory.SINK_EXPECTED_MESSAGES_NUM.key()
+                            + "' yet.");
+            switch (runtimeSink) {
+                case "SinkFunction":
+                    return new SinkFunctionProvider() {
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+
+                        @Override
+                        public SinkFunction<RowData> createSinkFunction() {
+                            return new AppendingSinkFunction(tableName, converter, rowtimeIndex);
+                        }
+                    };

Review comment:
       Use `SinkFunctionProvider#of` instead.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/sinkfunction/CollectionWatermarkSinkFunction.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.sinkfunction;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.LinkedList;
+
+/** A sink function to collect the watermark. */
+public class CollectionWatermarkSinkFunction extends RetractingSinkFunction {
+    private final DataStructureConverter converter;
+    private final String tableName;
+
+    public CollectionWatermarkSinkFunction(String tableName, DataStructureConverter converter) {
+        super(tableName, converter);
+        this.converter = converter;
+        this.tableName = tableName;
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) {
+        // wait to get periodic watermark
+        // the default interval time that generates periodic watermark is 200 ms
+        try {
+            Thread.sleep(300);
+        } catch (InterruptedException e) {
+            e.printStackTrace();

Review comment:
       I think it's better to just throw the RuntimeException("Meet interruption during the runtime.", e).

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;
+    private final String tableName;
+    private final boolean isInsertOnly;
+    private final String runtimeSink;
+    private final int expectedNum;
+    private final Map<String, DataType> writableMetadata;
+    private final Integer parallelism;
+    private final ChangelogMode changelogModeEnforced;
+    private final int rowtimeIndex;
+
+    public TestValuesTableSink(
+            DataType consumedDataType,
+            int[] primaryKeyIndices,
+            String tableName,
+            boolean isInsertOnly,
+            String runtimeSink,
+            int expectedNum,
+            Map<String, DataType> writableMetadata,
+            @Nullable Integer parallelism,
+            @Nullable ChangelogMode changelogModeEnforced,
+            int rowtimeIndex) {
+        this.consumedDataType = consumedDataType;
+        this.primaryKeyIndices = primaryKeyIndices;
+        this.tableName = tableName;
+        this.isInsertOnly = isInsertOnly;
+        this.runtimeSink = runtimeSink;
+        this.expectedNum = expectedNum;
+        this.writableMetadata = writableMetadata;
+        this.parallelism = parallelism;
+        this.changelogModeEnforced = changelogModeEnforced;
+        this.rowtimeIndex = rowtimeIndex;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // if param [changelogModeEnforced] is passed in, return it directly
+        if (changelogModeEnforced != null) {
+            return changelogModeEnforced;
+        }
+        if (isInsertOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            if (primaryKeyIndices.length > 0) {
+                // can update on key, ignore UPDATE_BEFORE
+                return ChangelogMode.upsert();
+            } else {
+                // don't have key, works in retract mode
+                return requestedMode;
+            }
+        }
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataStructureConverter converter = context.createDataStructureConverter(consumedDataType);
+        final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
+        final Boolean isEnforcedInsertOnly =
+                Optional.ofNullable(changelogModeEnforced)
+                        .map(changelogMode -> changelogMode.equals(ChangelogMode.insertOnly()))
+                        .orElse(false);
+        final Boolean isInsertOnly = isEnforcedInsertOnly || this.isInsertOnly;
+        if (isInsertOnly) {

Review comment:
       How about just move the code about insert-only sink function to a new methods, e.g. `createInsertOnlySinkFunction`? It will make here more compact and tidy.
   

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/sinkfunction/CollectionWatermarkSinkFunction.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.sinkfunction;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.LinkedList;
+
+/** A sink function to collect the watermark. */
+public class CollectionWatermarkSinkFunction extends RetractingSinkFunction {
+    private final DataStructureConverter converter;
+    private final String tableName;
+
+    public CollectionWatermarkSinkFunction(String tableName, DataStructureConverter converter) {
+        super(tableName, converter);
+        this.converter = converter;
+        this.tableName = tableName;
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) {
+        // wait to get periodic watermark
+        // the default interval time that generates periodic watermark is 200 ms
+        try {
+            Thread.sleep(300);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        TestValuesRuntimeFunctions.getWatermarkHistory()
+                .computeIfAbsent(tableName, k -> new LinkedList<>())
+                .add(new Watermark(context.currentWatermark()));
+        RowKind kind = value.getRowKind();
+        Row row = (Row) converter.toExternal(value);
+        assert row != null;
+        synchronized (TestValuesRuntimeFunctions.LOCK) {
+            localRawResult.add(kind.shortString() + "(" + row.toString() + ")");
+            if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
+                row.setKind(RowKind.INSERT);
+                localRetractResult.add(row.toString());
+            } else {
+                row.setKind(RowKind.INSERT);
+                boolean contains = localRetractResult.remove(row.toString());
+                if (!contains) {
+                    throw new RuntimeException(
+                            "Tried to retract a value that wasn't inserted first. "
+                                    + "This is probably an incorrectly implemented test.");
+                }
+            }
+        }

Review comment:
       These codes are almost as same as sink functions. How about we just take `CollectionWatermarkSinkFunction` as a wrapper and make the inner values sink function, e.g. `RetractingSinkFunction`  to do the collecting work? 

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala
##########
@@ -41,8 +42,155 @@ class SourceWatermarkITCase extends StreamingTestBase {
   @Rule
   def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
 
+  val valuesSourceData = Seq(
+    row(1, 1L, LocalDateTime.parse("2020-11-21T19:00:05.23")),
+    row(2, 2L, LocalDateTime.parse("2020-11-21T19:00:10.23")),
+    row(3, 3L, LocalDateTime.parse("2020-11-21T19:00:15.23")),
+    row(4, 4L, LocalDateTime.parse("2020-11-21T19:00:20.23"))
+  )
+
+  @Test
+  def testWatermarkPushDownInValuesSource(): Unit = {
+
+    val dataId = TestValuesTableFactory.registerData(valuesSourceData)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE ValuesSourceTable (
+         |  a INT,
+         |  b BIGINT,
+         |  c TIMESTAMP(3),
+         |  d as c - INTERVAL '5' second,
+         |  WATERMARK FOR d as d + INTERVAL '5' second) WITH(
+         |  'connector' = 'values',
+         |  'enable-watermark-push-down' = 'true',
+         |  'data-id' = '$dataId',
+         |  'enable-lookup' = 'false',
+         |  'runtime-source' = 'Source'
+         |)
+         |""".stripMargin)
+
+    // avoid to generate too many watermarks with timestamp Long.MinValue
+    tEnv.getConfig.getConfiguration
+      .set(CoreOptions.DEFAULT_PARALLELISM.asInstanceOf[ConfigOption[Any]], 1)
+
+    tEnv.executeSql(
+      s"""
+         | CREATE Table CollectingWatermarkSinkTable (
+         |   a INT,
+         |   b BIGINT,
+         |   c TIMESTAMP(3)
+         | ) with (
+         | 'connector' = 'values',
+         | 'sink-insert-only' = 'false',
+         | 'runtime-sink' = 'SinkWithCollectingWatermark'
+         | )
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        | INSERT INTO CollectingWatermarkSinkTable
+        | SELECT a, b, c FROM ValuesSourceTable
+        |""".stripMargin).await()
+
+    // the first watermark timestamp is always Long.MinValue
+    val expectedWatermarkOutput = Seq(
+      "+1705471-09-26T16:47:04.192",
+      "2020-11-21T19:00:05.230",
+      "2020-11-21T19:00:10.230",
+      "2020-11-21T19:00:15.230"
+    )
+    val expectedData = List(
+      "1,1,2020-11-21T19:00:05.230",
+      "2,2,2020-11-21T19:00:10.230",
+      "3,3,2020-11-21T19:00:15.230",
+      "4,4,2020-11-21T19:00:20.230"
+    )
+
+    val sinkName = "CollectingWatermarkSinkTable"
+
+    val result = TestValuesTableFactory.getResults(sinkName).asScala.toList
+
+    val actualWatermark = TestValuesTableFactory.getWatermarkOutput(sinkName)
+      .asScala
+      .map(x => TimestampData.fromEpochMillis(x.getTimestamp).toLocalDateTime.toString)
+      .toList
+
+    assertEquals(expectedWatermarkOutput, actualWatermark)
+    assertEquals(expectedData.sorted, result.sorted)
+  }
+
+  @Test
+  def testWatermarkAndFilterAndProjectionPushDownInValuesSource(): Unit = {
+
+    val dataId = TestValuesTableFactory.registerData(valuesSourceData)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE ValuesSourceTableWithFilterAndProjection (
+         |  a INT,
+         |  b BIGINT,
+         |  c TIMESTAMP(3),
+         |  d as c - INTERVAL '5' second,
+         |  WATERMARK FOR d as d + INTERVAL '5' second) WITH (
+         |  'connector' = 'values',
+         |  'enable-watermark-push-down' = 'true',
+         |  'data-id' = '$dataId',
+         |  'enable-lookup' = 'false',
+         |  'runtime-source' = 'Source',
+         |  'filterable-fields' = 'a'
+         |)
+         |""".stripMargin)
+
+    // avoid to generate too many watermarks with timestamp Long.MinValue
+    tEnv.getConfig.getConfiguration
+      .set(CoreOptions.DEFAULT_PARALLELISM.asInstanceOf[ConfigOption[Any]], 1)
+
+
+    tEnv.executeSql(
+      s"""
+         | CREATE Table CollectingWatermarkSinkTableWithFilterAndProjection (
+         |   a INT
+         | ) with (
+         | 'connector' = 'values',
+         | 'sink-insert-only' = 'false',
+         | 'runtime-sink' = 'SinkWithCollectingWatermark'
+         | )
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        | INSERT INTO CollectingWatermarkSinkTableWithFilterAndProjection
+        | SELECT a FROM ValuesSourceTableWithFilterAndProjection WHERE a > 2
+        |""".stripMargin).await()
+
+    // the first watermark timestamp is always Long.MinValue
+    val expectedWatermarkOutput = Seq(
+      "+1705471-09-26T16:47:04.192",
+      "2020-11-21T19:00:15.230"
+    )
+    val expectedData = List(
+      "3",
+      "4"
+    )
+
+    val sinkName = "CollectingWatermarkSinkTableWithFilterAndProjection"
+
+    val result = TestValuesTableFactory.getResults(sinkName).asScala.toList
+
+    val actualWatermark = TestValuesTableFactory.getWatermarkOutput(sinkName)
+      .asScala
+      .map(x => TimestampData.fromEpochMillis(x.getTimestamp).toLocalDateTime.toString)
+      .toList
+
+    assertEquals(expectedWatermarkOutput, actualWatermark)
+    assertEquals(expectedData.sorted, result.sorted)
+  }
+
+
+

Review comment:
       Multiple emty lines.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesource/TestValuesScanTableSource.java
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesource;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RuntimeConverter;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.factories.TestValuesFactoryUtils;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.source.TestValuesSource;
+import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
+import org.apache.flink.table.planner.utils.FilterUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Values {@link ScanTableSource} for testing. */
+public class TestValuesScanTableSource
+        implements ScanTableSource,
+                SupportsProjectionPushDown,
+                SupportsFilterPushDown,
+                SupportsLimitPushDown,
+                SupportsPartitionPushDown,
+                SupportsReadingMetadata {
+
+    protected DataType producedDataType;
+    protected final ChangelogMode changelogMode;
+    protected final boolean bounded;
+    protected final String runtimeSource;
+    protected final boolean failingSource;
+    protected Map<Map<String, String>, Collection<Row>> data;
+
+    protected final boolean nestedProjectionSupported;
+    protected @Nullable int[][] projectedPhysicalFields;
+    protected List<ResolvedExpression> filterPredicates;
+    protected final Set<String> filterableFields;
+    protected long limit;
+    protected int numElementToSkip;
+    protected List<Map<String, String>> allPartitions;
+    protected final Map<String, DataType> readableMetadata;
+    protected @Nullable int[] projectedMetadataFields;
+
+    public TestValuesScanTableSource(
+            DataType producedDataType,
+            ChangelogMode changelogMode,
+            boolean bounded,
+            String runtimeSource,
+            boolean failingSource,
+            Map<Map<String, String>, Collection<Row>> data,
+            boolean nestedProjectionSupported,
+            @Nullable int[][] projectedPhysicalFields,
+            List<ResolvedExpression> filterPredicates,
+            Set<String> filterableFields,
+            int numElementToSkip,
+            long limit,
+            List<Map<String, String>> allPartitions,
+            Map<String, DataType> readableMetadata,
+            @Nullable int[] projectedMetadataFields) {
+        this.producedDataType = producedDataType;
+        this.changelogMode = changelogMode;
+        this.bounded = bounded;
+        this.runtimeSource = runtimeSource;
+        this.failingSource = failingSource;
+        this.data = data;
+        this.nestedProjectionSupported = nestedProjectionSupported;
+        this.projectedPhysicalFields = projectedPhysicalFields;
+        this.filterPredicates = filterPredicates;
+        this.filterableFields = filterableFields;
+        this.numElementToSkip = numElementToSkip;
+        this.limit = limit;
+        this.allPartitions = allPartitions;
+        this.readableMetadata = readableMetadata;
+        this.projectedMetadataFields = projectedMetadataFields;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return changelogMode;
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        TypeInformation<RowData> type =
+                runtimeProviderContext.createTypeInformation(producedDataType);
+        TypeSerializer<RowData> serializer = type.createSerializer(new ExecutionConfig());
+        DataStructureConverter converter =
+                runtimeProviderContext.createDataStructureConverter(producedDataType);
+        converter.open(
+                RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
+        Collection<RowData> values = convertToRowData(converter);
+
+        switch (runtimeSource) {
+            case "SourceFunction":
+                try {
+                    final SourceFunction<RowData> sourceFunction;
+                    if (failingSource) {
+                        sourceFunction =
+                                new FailingCollectionSource<>(
+                                        serializer, values, values.size() / 2);
+                    } else {
+                        sourceFunction = new FromElementsFunction<>(serializer, values);
+                    }
+                    return SourceFunctionProvider.of(sourceFunction, bounded);
+                } catch (IOException e) {
+                    throw new TableException("Fail to init source function", e);
+                }
+            case "InputFormat":
+                checkArgument(
+                        !failingSource,
+                        "Values InputFormat Source doesn't support as failing source.");
+                return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
+            case "DataStream":
+                checkArgument(
+                        !failingSource,
+                        "Values DataStream Source doesn't support as failing source.");
+                try {
+                    FromElementsFunction<RowData> function =
+                            new FromElementsFunction<>(serializer, values);
+                    return new DataStreamScanProvider() {
+                        @Override
+                        public DataStream<RowData> produceDataStream(
+                                StreamExecutionEnvironment execEnv) {
+                            return execEnv.addSource(function);
+                        }
+
+                        @Override
+                        public boolean isBounded() {
+                            return bounded;
+                        }
+                    };
+                } catch (IOException e) {
+                    throw new TableException("Fail to init data stream source", e);
+                }
+            case "Source":
+                checkArgument(
+                        !failingSource,
+                        "Values DataStream Source doesn't support as failing source.");
+                try {
+                    return SourceProvider.of(
+                            new TestValuesSource(serializer, values, values.size()));
+                } catch (IOException e) {
+                    throw new TableException("Fail to init source", e);
+                }
+
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported runtime source class: " + runtimeSource);
+        }
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        return nestedProjectionSupported;
+    }
+
+    @Override
+    public void applyProjection(int[][] projectedFields) {
+        this.producedDataType = DataTypeUtils.projectRow(producedDataType, projectedFields);
+        this.projectedPhysicalFields = projectedFields;
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+        List<ResolvedExpression> remainingFilters = new ArrayList<>();
+        for (ResolvedExpression expr : filters) {
+            if (FilterUtils.shouldPushDown(expr, filterableFields)) {
+                acceptedFilters.add(expr);
+            } else {
+                remainingFilters.add(expr);
+            }
+        }
+        this.filterPredicates = acceptedFilters;
+        return Result.of(acceptedFilters, remainingFilters);
+    }
+
+    private Function<String, Comparable<?>> getValueGetter(Row row) {
+        final List<String> fieldNames = DataTypeUtils.flattenToNames(producedDataType);
+        return fieldName -> {
+            int idx = fieldNames.indexOf(fieldName);
+            return (Comparable<?>) row.getField(idx);
+        };
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new TestValuesScanTableSource(
+                producedDataType,
+                changelogMode,
+                bounded,
+                runtimeSource,
+                failingSource,
+                data,
+                nestedProjectionSupported,
+                projectedPhysicalFields,
+                filterPredicates,
+                filterableFields,
+                numElementToSkip,
+                limit,
+                allPartitions,
+                readableMetadata,
+                projectedMetadataFields);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TestValues";

Review comment:
       TestValuesSource

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesink/TestValuesTableSink.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingOutputFormat;
+import org.apache.flink.table.planner.factories.sinkfunction.AppendingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.CollectionWatermarkSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.KeyedUpsertingSinkFunction;
+import org.apache.flink.table.planner.factories.sinkfunction.RetractingSinkFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Values {@link DynamicTableSink} for testing. */
+public class TestValuesTableSink
+        implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning {
+
+    private DataType consumedDataType;
+    private int[] primaryKeyIndices;
+    private final String tableName;
+    private final boolean isInsertOnly;
+    private final String runtimeSink;
+    private final int expectedNum;
+    private final Map<String, DataType> writableMetadata;
+    private final Integer parallelism;
+    private final ChangelogMode changelogModeEnforced;
+    private final int rowtimeIndex;
+
+    public TestValuesTableSink(
+            DataType consumedDataType,
+            int[] primaryKeyIndices,
+            String tableName,
+            boolean isInsertOnly,
+            String runtimeSink,
+            int expectedNum,
+            Map<String, DataType> writableMetadata,
+            @Nullable Integer parallelism,
+            @Nullable ChangelogMode changelogModeEnforced,
+            int rowtimeIndex) {
+        this.consumedDataType = consumedDataType;
+        this.primaryKeyIndices = primaryKeyIndices;
+        this.tableName = tableName;
+        this.isInsertOnly = isInsertOnly;
+        this.runtimeSink = runtimeSink;
+        this.expectedNum = expectedNum;
+        this.writableMetadata = writableMetadata;
+        this.parallelism = parallelism;
+        this.changelogModeEnforced = changelogModeEnforced;
+        this.rowtimeIndex = rowtimeIndex;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        // if param [changelogModeEnforced] is passed in, return it directly
+        if (changelogModeEnforced != null) {
+            return changelogModeEnforced;
+        }
+        if (isInsertOnly) {
+            return ChangelogMode.insertOnly();
+        } else {
+            if (primaryKeyIndices.length > 0) {
+                // can update on key, ignore UPDATE_BEFORE
+                return ChangelogMode.upsert();
+            } else {
+                // don't have key, works in retract mode
+                return requestedMode;
+            }
+        }
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DataStructureConverter converter = context.createDataStructureConverter(consumedDataType);
+        final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
+        final Boolean isEnforcedInsertOnly =
+                Optional.ofNullable(changelogModeEnforced)
+                        .map(changelogMode -> changelogMode.equals(ChangelogMode.insertOnly()))
+                        .orElse(false);
+        final Boolean isInsertOnly = isEnforcedInsertOnly || this.isInsertOnly;
+        if (isInsertOnly) {
+            Preconditions.checkArgument(
+                    expectedNum == -1,
+                    "Appending Sink doesn't support '"
+                            + TestValuesTableFactory.SINK_EXPECTED_MESSAGES_NUM.key()
+                            + "' yet.");
+            switch (runtimeSink) {
+                case "SinkFunction":
+                    return new SinkFunctionProvider() {
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+
+                        @Override
+                        public SinkFunction<RowData> createSinkFunction() {
+                            return new AppendingSinkFunction(tableName, converter, rowtimeIndex);
+                        }
+                    };
+                case "OutputFormat":
+                    return new OutputFormatProvider() {
+                        @Override
+                        public OutputFormat<RowData> createOutputFormat() {
+                            return new AppendingOutputFormat(tableName, converter);
+                        }
+
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+                    };
+                case "DataStream":
+                    return new DataStreamSinkProvider() {
+                        @Override
+                        public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
+                            return dataStream.addSink(
+                                    new AppendingSinkFunction(tableName, converter, rowtimeIndex));
+                        }
+
+                        @Override
+                        public Optional<Integer> getParallelism() {
+                            return parallelismOption;
+                        }
+                    };
+
+                default:
+                    throw new IllegalArgumentException(
+                            "Unsupported runtime sink class: " + runtimeSink);
+            }
+        } else {
+            SinkFunction<RowData> sinkFunction;
+            if ("SinkWithCollectingWatermark".equals(runtimeSink)) {
+                sinkFunction = new CollectionWatermarkSinkFunction(tableName, converter);
+            } else {
+                // we don't support OutputFormat for updating query in the TestValues connector
+                assert runtimeSink.equals("SinkFunction");
+
+                if (primaryKeyIndices.length > 0) {
+                    sinkFunction =
+                            new KeyedUpsertingSinkFunction(
+                                    tableName, converter, primaryKeyIndices, expectedNum);
+                } else {
+                    Preconditions.checkArgument(
+                            expectedNum == -1,
+                            "Retracting Sink doesn't support '"
+                                    + TestValuesTableFactory.SINK_EXPECTED_MESSAGES_NUM.key()
+                                    + "' yet.");
+                    sinkFunction = new RetractingSinkFunction(tableName, converter);
+                }
+            }
+            return SinkFunctionProvider.of(sinkFunction);
+        }
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new TestValuesTableSink(
+                consumedDataType,
+                primaryKeyIndices,
+                tableName,
+                isInsertOnly,
+                runtimeSink,
+                expectedNum,
+                writableMetadata,
+                parallelism,
+                changelogModeEnforced,
+                rowtimeIndex);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "TestValues";

Review comment:
       "TestValuesSink"?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/dynamictablesource/MockedLookupTableSource.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.dynamictablesource;
+
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+
+/** A mocked {@link LookupTableSource} for validation test. */
+public class MockedLookupTableSource implements LookupTableSource {
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        return null;

Review comment:
       nit: It's better to just throw `UnsupportedExecption("Not Implemented.")` if no one cares about the implementation. If somebody really cares about the implemenation, it can get more concrete exception details rather than NPE.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }

Review comment:
       It seems that no one uses this. We can remove it ?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestValuesSplit> createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> createEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext) throws IOException {
+        final int currentParallelism = enumContext.currentParallelism();
+
+        if (elements == null) {
+            deserializeElements();
+        }
+
+        final TestValuesIterator[] subIterators =
+                new TestValuesIterator(elements.iterator()).split(currentParallelism);
+
+        final List<TestValuesSplit> splits = new ArrayList<>(subIterators.length);
+        int splitId = 1;
+        for (TestValuesIterator it : subIterators) {
+            if (it.hasNext()) {
+                splits.add(new TestValuesSplit(String.valueOf(splitId++), it.getElements()));
+            }
+        }
+
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> restoreEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext,
+            Collection<TestValuesSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<TestValuesSplit> getSplitSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceSerializer(serializer);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<TestValuesSplit>>
+            getEnumeratorCheckpointSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceCheckpointSerializer(serializer);
+    }
+
+    /** A source split for {@link TestValuesSource}. */
+    public static class TestValuesSplit
+            implements IteratorSourceSplit<RowData, TestValuesIterator> {
+
+        private final String splitId;
+        private final Iterator<RowData> elements;
+
+        public TestValuesSplit(String splitId, Iterator<RowData> elements) {
+            this.splitId = splitId;
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator getIterator() {
+            return new TestValuesIterator(elements);
+        }
+
+        @Override
+        public IteratorSourceSplit<RowData, TestValuesIterator> getUpdatedSplitForIterator(
+                TestValuesIterator iterator) {
+            return new TestValuesSplit(splitId, elements);
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+    }
+
+    /** A source elements iterator in {@link TestValuesSource}. */
+    public static class TestValuesIterator extends SplittableIterator<RowData> {
+
+        private final Iterator<RowData> elements;
+
+        public TestValuesIterator(Iterator<RowData> elements) {
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions must be at least 1.");
+            }
+
+            if (numPartitions == 1) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            int elementsPerSplit;
+
+            List<RowData> values = IteratorUtils.toList(elements);
+
+            if (values.size() == 0) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            elementsPerSplit = values.size() / numPartitions;
+
+            // figure out how many partitions get one in addition
+            long numWithExtra = values.size() - elementsPerSplit * numPartitions;
+
+            List<RowData>[] splitLists = new List[numPartitions];
+
+            int curr = 0;
+            int i = 0;
+            for (; i < numWithExtra; i++) {
+                int next = curr + elementsPerSplit + 1;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+            for (; i < numPartitions; i++) {
+                int next = curr + elementsPerSplit;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }

Review comment:
       It only works for the INSERT-ONLY elements. We may need add some checks to prevent wrong usage.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestValuesSplit> createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> createEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext) throws IOException {
+        final int currentParallelism = enumContext.currentParallelism();
+
+        if (elements == null) {
+            deserializeElements();
+        }
+
+        final TestValuesIterator[] subIterators =
+                new TestValuesIterator(elements.iterator()).split(currentParallelism);
+
+        final List<TestValuesSplit> splits = new ArrayList<>(subIterators.length);
+        int splitId = 1;
+        for (TestValuesIterator it : subIterators) {
+            if (it.hasNext()) {
+                splits.add(new TestValuesSplit(String.valueOf(splitId++), it.getElements()));
+            }
+        }
+
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> restoreEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext,
+            Collection<TestValuesSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<TestValuesSplit> getSplitSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceSerializer(serializer);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<TestValuesSplit>>
+            getEnumeratorCheckpointSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceCheckpointSerializer(serializer);
+    }
+
+    /** A source split for {@link TestValuesSource}. */
+    public static class TestValuesSplit
+            implements IteratorSourceSplit<RowData, TestValuesIterator> {
+
+        private final String splitId;
+        private final Iterator<RowData> elements;
+
+        public TestValuesSplit(String splitId, Iterator<RowData> elements) {
+            this.splitId = splitId;
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator getIterator() {
+            return new TestValuesIterator(elements);
+        }
+
+        @Override
+        public IteratorSourceSplit<RowData, TestValuesIterator> getUpdatedSplitForIterator(
+                TestValuesIterator iterator) {
+            return new TestValuesSplit(splitId, elements);
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+    }
+
+    /** A source elements iterator in {@link TestValuesSource}. */
+    public static class TestValuesIterator extends SplittableIterator<RowData> {
+
+        private final Iterator<RowData> elements;
+
+        public TestValuesIterator(Iterator<RowData> elements) {
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions must be at least 1.");
+            }
+
+            if (numPartitions == 1) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            int elementsPerSplit;
+
+            List<RowData> values = IteratorUtils.toList(elements);
+
+            if (values.size() == 0) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            elementsPerSplit = values.size() / numPartitions;
+
+            // figure out how many partitions get one in addition
+            long numWithExtra = values.size() - elementsPerSplit * numPartitions;
+
+            List<RowData>[] splitLists = new List[numPartitions];
+
+            int curr = 0;
+            int i = 0;
+            for (; i < numWithExtra; i++) {
+                int next = curr + elementsPerSplit + 1;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+            for (; i < numPartitions; i++) {
+                int next = curr + elementsPerSplit;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+
+            TestValuesIterator[] iters = new TestValuesIterator[numPartitions];
+            for (int j = 0; j < splitLists.length; j++) {
+                iters[j] = new TestValuesIterator(new ArrayList<>(splitLists[j]).iterator());
+            }
+
+            return iters;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            return IteratorUtils.toList(elements).size();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return elements.hasNext();
+        }
+
+        @Override
+        public RowData next() {
+            return elements.next();
+        }
+
+        public Iterator<RowData> getElements() {
+            return elements;
+        }
+    }
+
+    /** A element serializer for {@link TestValuesSource}. */
+    private static class TestValuesSourceSerializer
+            implements SimpleVersionedSerializer<TestValuesSplit> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        private final TypeSerializer<RowData> serializer;
+
+        public TestValuesSourceSerializer(TypeSerializer<RowData> serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(TestValuesSplit split) throws IOException {
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
+            serializeSplit(wrapper, split, serializer);
+
+            return out.toByteArray();
+        }
+
+        public static void serializeSplit(
+                DataOutputViewStreamWrapper wrapper,
+                TestValuesSplit split,
+                TypeSerializer<RowData> serializer)
+                throws IOException {
+
+            wrapper.writeUTF(split.splitId());
+            List<RowData> list = IteratorUtils.toList(split.getIterator());

Review comment:
       nit: Please use the variable meaning to describe the variable meaning rather than data structure, which is more meaningful. Here I think `elements` is more better.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestValuesSplit> createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> createEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext) throws IOException {
+        final int currentParallelism = enumContext.currentParallelism();
+
+        if (elements == null) {

Review comment:
       Why only deserialize the elemenets when elements is null ? Can we always deseriailize the elements?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;

Review comment:
       `isBounded` is always true.

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
##########
@@ -131,9 +131,54 @@ class SourceWatermarkTest extends TableTestBase {
          |   'connector' = 'values',
          |   'enable-watermark-push-down' = 'true',
          |   'bounded' = 'false',
-         |   'disable-lookup' = 'true'
+         |   'enable-lookup' = 'false'
          | )
          """.stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        | CREATE TABLE ValueSourceTable(
+        |   a INT,
+        |   b BIGINT,
+        |   c TIMESTAMP(3),
+        |   d AS c + INTERVAL '5' SECOND,
+        |   WATERMARK FOR d AS d - INTERVAL '5' SECOND
+        | ) WITH (
+        |   'connector' = 'values',
+        |   'enable-watermark-push-down' = 'true',
+        |   'bounded' = 'false',
+        |   'enable-lookup' = 'false',
+        |   'runtime-source' = 'Source'
+        | )
+        |""".stripMargin)

Review comment:
       It seems the `ValuesSourceTable` is ussless. Just remove it?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestValuesSplit> createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> createEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext) throws IOException {
+        final int currentParallelism = enumContext.currentParallelism();
+
+        if (elements == null) {
+            deserializeElements();
+        }
+
+        final TestValuesIterator[] subIterators =
+                new TestValuesIterator(elements.iterator()).split(currentParallelism);
+
+        final List<TestValuesSplit> splits = new ArrayList<>(subIterators.length);
+        int splitId = 1;
+        for (TestValuesIterator it : subIterators) {
+            if (it.hasNext()) {
+                splits.add(new TestValuesSplit(String.valueOf(splitId++), it.getElements()));
+            }
+        }
+
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> restoreEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext,
+            Collection<TestValuesSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<TestValuesSplit> getSplitSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceSerializer(serializer);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<TestValuesSplit>>
+            getEnumeratorCheckpointSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceCheckpointSerializer(serializer);
+    }
+
+    /** A source split for {@link TestValuesSource}. */
+    public static class TestValuesSplit
+            implements IteratorSourceSplit<RowData, TestValuesIterator> {
+
+        private final String splitId;
+        private final Iterator<RowData> elements;
+
+        public TestValuesSplit(String splitId, Iterator<RowData> elements) {
+            this.splitId = splitId;
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator getIterator() {
+            return new TestValuesIterator(elements);
+        }
+
+        @Override
+        public IteratorSourceSplit<RowData, TestValuesIterator> getUpdatedSplitForIterator(
+                TestValuesIterator iterator) {
+            return new TestValuesSplit(splitId, elements);
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+    }
+
+    /** A source elements iterator in {@link TestValuesSource}. */
+    public static class TestValuesIterator extends SplittableIterator<RowData> {
+
+        private final Iterator<RowData> elements;
+
+        public TestValuesIterator(Iterator<RowData> elements) {
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions must be at least 1.");
+            }
+
+            if (numPartitions == 1) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            int elementsPerSplit;
+
+            List<RowData> values = IteratorUtils.toList(elements);
+
+            if (values.size() == 0) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            elementsPerSplit = values.size() / numPartitions;
+
+            // figure out how many partitions get one in addition
+            long numWithExtra = values.size() - elementsPerSplit * numPartitions;
+
+            List<RowData>[] splitLists = new List[numPartitions];
+
+            int curr = 0;
+            int i = 0;
+            for (; i < numWithExtra; i++) {
+                int next = curr + elementsPerSplit + 1;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+            for (; i < numPartitions; i++) {
+                int next = curr + elementsPerSplit;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+
+            TestValuesIterator[] iters = new TestValuesIterator[numPartitions];
+            for (int j = 0; j < splitLists.length; j++) {
+                iters[j] = new TestValuesIterator(new ArrayList<>(splitLists[j]).iterator());
+            }
+
+            return iters;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            return IteratorUtils.toList(elements).size();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return elements.hasNext();
+        }
+
+        @Override
+        public RowData next() {
+            return elements.next();
+        }
+
+        public Iterator<RowData> getElements() {
+            return elements;
+        }
+    }
+
+    /** A element serializer for {@link TestValuesSource}. */
+    private static class TestValuesSourceSerializer

Review comment:
       nit: Would be better to rename to `TestValuesSplitSerializer`.

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/source/TestValuesSource.java
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SplittableIterator;
+
+import org.apache.commons.collections.IteratorUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/** A data source implementation for {@link TestValuesTableFactory}. */
+public class TestValuesSource
+        implements Source<
+                RowData,
+                TestValuesSource.TestValuesSplit,
+                Collection<TestValuesSource.TestValuesSplit>> {
+
+    private final boolean isBounded = true;
+
+    private transient Iterable<RowData> elements;
+
+    private int elementNums;
+
+    private byte[] elementsSerialized;
+
+    private TypeSerializer<RowData> serializer;
+
+    public TestValuesSource(
+            TypeSerializer<RowData> serializer, Iterable<RowData> elements, int elementNums)
+            throws IOException {
+        this.serializer = serializer;
+        this.elements = elements;
+        this.elementNums = elementNums;
+        serializeElements();
+    }
+
+    public TestValuesSource(Iterable<RowData> elements) throws IOException {
+        this.elements = elements;
+        serializeElements();
+    }
+
+    private void serializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        try {
+            for (RowData element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (IOException e) {
+            throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    private void deserializeElements() throws IOException {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        Preconditions.checkState(
+                elementsSerialized != null && elementsSerialized.length != 0,
+                "elementsSerialized doesn't exist");
+        ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
+        final DataInputView input = new DataInputViewStreamWrapper(bais);
+
+        List<RowData> elements = new ArrayList<>();
+
+        int index = 0;
+        while (index < elementNums) {
+            try {
+                RowData element = serializer.deserialize(input);
+                elements.add(element);
+                index++;
+            } catch (IOException e) {
+                throw new IOException(
+                        "Deserializing the source elements failed: " + e.getMessage(), e);
+            }
+        }
+        this.elements = elements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isBounded ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, TestValuesSplit> createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> createEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext) throws IOException {
+        final int currentParallelism = enumContext.currentParallelism();
+
+        if (elements == null) {
+            deserializeElements();
+        }
+
+        final TestValuesIterator[] subIterators =
+                new TestValuesIterator(elements.iterator()).split(currentParallelism);
+
+        final List<TestValuesSplit> splits = new ArrayList<>(subIterators.length);
+        int splitId = 1;
+        for (TestValuesIterator it : subIterators) {
+            if (it.hasNext()) {
+                splits.add(new TestValuesSplit(String.valueOf(splitId++), it.getElements()));
+            }
+        }
+
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<TestValuesSplit, Collection<TestValuesSplit>> restoreEnumerator(
+            SplitEnumeratorContext<TestValuesSplit> enumContext,
+            Collection<TestValuesSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<TestValuesSplit> getSplitSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceSerializer(serializer);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<TestValuesSplit>>
+            getEnumeratorCheckpointSerializer() {
+        Preconditions.checkState(serializer != null, "serializer is not set");
+        return new TestValuesSourceCheckpointSerializer(serializer);
+    }
+
+    /** A source split for {@link TestValuesSource}. */
+    public static class TestValuesSplit
+            implements IteratorSourceSplit<RowData, TestValuesIterator> {
+
+        private final String splitId;
+        private final Iterator<RowData> elements;
+
+        public TestValuesSplit(String splitId, Iterator<RowData> elements) {
+            this.splitId = splitId;
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator getIterator() {
+            return new TestValuesIterator(elements);
+        }
+
+        @Override
+        public IteratorSourceSplit<RowData, TestValuesIterator> getUpdatedSplitForIterator(
+                TestValuesIterator iterator) {
+            return new TestValuesSplit(splitId, elements);
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+    }
+
+    /** A source elements iterator in {@link TestValuesSource}. */
+    public static class TestValuesIterator extends SplittableIterator<RowData> {
+
+        private final Iterator<RowData> elements;
+
+        public TestValuesIterator(Iterator<RowData> elements) {
+            this.elements = elements;
+        }
+
+        @Override
+        public TestValuesIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions must be at least 1.");
+            }
+
+            if (numPartitions == 1) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            int elementsPerSplit;
+
+            List<RowData> values = IteratorUtils.toList(elements);
+
+            if (values.size() == 0) {
+                return new TestValuesIterator[] {new TestValuesIterator(elements)};
+            }
+
+            elementsPerSplit = values.size() / numPartitions;
+
+            // figure out how many partitions get one in addition
+            long numWithExtra = values.size() - elementsPerSplit * numPartitions;
+
+            List<RowData>[] splitLists = new List[numPartitions];
+
+            int curr = 0;
+            int i = 0;
+            for (; i < numWithExtra; i++) {
+                int next = curr + elementsPerSplit + 1;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+            for (; i < numPartitions; i++) {
+                int next = curr + elementsPerSplit;
+                splitLists[i] = values.subList(curr, next);
+                curr = next;
+            }
+
+            TestValuesIterator[] iters = new TestValuesIterator[numPartitions];
+            for (int j = 0; j < splitLists.length; j++) {
+                iters[j] = new TestValuesIterator(new ArrayList<>(splitLists[j]).iterator());
+            }
+
+            return iters;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            return IteratorUtils.toList(elements).size();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return elements.hasNext();
+        }
+
+        @Override
+        public RowData next() {
+            return elements.next();
+        }
+
+        public Iterator<RowData> getElements() {
+            return elements;
+        }
+    }
+
+    /** A element serializer for {@link TestValuesSource}. */
+    private static class TestValuesSourceSerializer
+            implements SimpleVersionedSerializer<TestValuesSplit> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        private final TypeSerializer<RowData> serializer;
+
+        public TestValuesSourceSerializer(TypeSerializer<RowData> serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(TestValuesSplit split) throws IOException {
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
+            serializeSplit(wrapper, split, serializer);
+
+            return out.toByteArray();
+        }
+
+        public static void serializeSplit(
+                DataOutputViewStreamWrapper wrapper,
+                TestValuesSplit split,
+                TypeSerializer<RowData> serializer)
+                throws IOException {
+
+            wrapper.writeUTF(split.splitId());
+            List<RowData> list = IteratorUtils.toList(split.getIterator());
+            wrapper.writeInt(list.size());
+            Iterator<RowData> iterator = list.iterator();
+
+            while (iterator.hasNext()) {
+                RowData element = iterator.next();
+                serializer.serialize(element, wrapper);
+            }
+        }
+
+        @Override
+        public TestValuesSplit deserialize(int version, byte[] serialized) throws IOException {
+            if (version != CURRENT_VERSION) {
+                throw new IOException("Unrecognized version: " + version);
+            }
+            final ByteArrayInputStream in = new ByteArrayInputStream(serialized);
+            final DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
+
+            return deserializeSplit(wrapper, serializer);
+        }
+
+        public static TestValuesSplit deserializeSplit(
+                DataInputViewStreamWrapper wrapper, TypeSerializer<RowData> serializer)
+                throws IOException {
+
+            final String splitId = wrapper.readUTF();
+            final int count = wrapper.readInt();
+            final List<RowData> elements = new ArrayList<>();
+            int index = 0;
+            while (index < count) {
+                elements.add(serializer.deserialize(wrapper));
+                index++;
+            }
+
+            return new TestValuesSplit(splitId, elements.iterator());
+        }
+    }
+
+    /** A checkpoint serializer for {@link TestValuesSource}. */
+    public static class TestValuesSourceCheckpointSerializer

Review comment:
       nit: Would be better to rename to `TestValuesSourceEnumeratorSerializer`

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala
##########
@@ -41,8 +42,155 @@ class SourceWatermarkITCase extends StreamingTestBase {
   @Rule
   def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
 
+  val valuesSourceData = Seq(
+    row(1, 1L, LocalDateTime.parse("2020-11-21T19:00:05.23")),
+    row(2, 2L, LocalDateTime.parse("2020-11-21T19:00:10.23")),
+    row(3, 3L, LocalDateTime.parse("2020-11-21T19:00:15.23")),
+    row(4, 4L, LocalDateTime.parse("2020-11-21T19:00:20.23"))
+  )
+
+  @Test
+  def testWatermarkPushDownInValuesSource(): Unit = {
+
+    val dataId = TestValuesTableFactory.registerData(valuesSourceData)
+
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE ValuesSourceTable (
+         |  a INT,
+         |  b BIGINT,
+         |  c TIMESTAMP(3),
+         |  d as c - INTERVAL '5' second,
+         |  WATERMARK FOR d as d + INTERVAL '5' second) WITH(
+         |  'connector' = 'values',
+         |  'enable-watermark-push-down' = 'true',
+         |  'data-id' = '$dataId',
+         |  'enable-lookup' = 'false',
+         |  'runtime-source' = 'Source'
+         |)
+         |""".stripMargin)
+
+    // avoid to generate too many watermarks with timestamp Long.MinValue
+    tEnv.getConfig.getConfiguration
+      .set(CoreOptions.DEFAULT_PARALLELISM.asInstanceOf[ConfigOption[Any]], 1)
+
+    tEnv.executeSql(
+      s"""
+         | CREATE Table CollectingWatermarkSinkTable (
+         |   a INT,
+         |   b BIGINT,
+         |   c TIMESTAMP(3)
+         | ) with (
+         | 'connector' = 'values',
+         | 'sink-insert-only' = 'false',
+         | 'runtime-sink' = 'SinkWithCollectingWatermark'
+         | )
+         |""".stripMargin)
+
+    tEnv.executeSql(
+      """
+        | INSERT INTO CollectingWatermarkSinkTable
+        | SELECT a, b, c FROM ValuesSourceTable
+        |""".stripMargin).await()
+
+    // the first watermark timestamp is always Long.MinValue
+    val expectedWatermarkOutput = Seq(
+      "+1705471-09-26T16:47:04.192",

Review comment:
       Use `TimestampData.fromEpochMillis(Long.MinValue).toString` that is more readable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org