You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/16 13:29:35 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12150: [FLINK-17026][kafka] Introduce a new Kafka connect or with new proper…

wuchong commented on a change in pull request #12150:
URL: https://github.com/apache/flink/pull/12150#discussion_r426140408



##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")

Review comment:
       FLIP-122 proposes to use `scan.startup.mode` to make the options more hierarchy.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")

Review comment:
       Remove this. `zookeeper.connect` is not needed anymore for kafka 0.8+

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();

Review comment:
       description.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();
+
+	// --------------------------------------------------------------------------------------------
+	// Sink specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
+			.key("sink.partitioner")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional output partitioning from Flink's partitions\n"
+					+ "into Kafka's partitions valid enumerations are\n"
+					+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+
+	// --------------------------------------------------------------------------------------------
+	// Option enumerations
+	// --------------------------------------------------------------------------------------------
+
+	// Start up offset.
+	public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+	private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+			SCAN_STARTUP_MODE_VALUE_EARLIEST,
+			SCAN_STARTUP_MODE_VALUE_LATEST,
+			SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+	// Sink partitioner.
+	public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+
+	private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
+			SINK_PARTITIONER_VALUE_FIXED,
+			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+	// Prefix for Kafka specific properties.
+	public static final String PROPERTIES = "properties";

Review comment:
       Use "properties."? Otherwise, `properties-abc.xyz` will also be matched.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSourceBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link ScanTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+@Internal
+public abstract class KafkaScanSourceBase implements ScanTableSource {
+
+	// --------------------------------------------------------------------------------------------
+	// Common attributes
+	// --------------------------------------------------------------------------------------------
+	protected final DataType outputDataType;
+
+	// --------------------------------------------------------------------------------------------
+	// Scan format attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** Scan format for decoding records from Kafka. */
+	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka-specific attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** The Kafka topic to consume. */
+	protected final String topic;
+
+	/** Properties for the Kafka consumer. */
+	protected final Properties properties;
+
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	protected final StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+	/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
+	protected final long startupTimestampMillis;
+
+	/** The default value when startup timestamp is not used.*/
+	private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source produced data type
+	 * @param topic                  Kafka topic to consume.
+	 * @param properties             Properties for the Kafka consumer.
+	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param startupMode            Startup mode for the contained consumer.
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+		this.outputDataType = Preconditions.checkNotNull(
+				outputDataType, "Produced data type must not be null.");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.scanFormat = Preconditions.checkNotNull(
+				scanFormat, "Scan format must not be null.");
+		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+		this.specificStartupOffsets = Preconditions.checkNotNull(
+			specificStartupOffsets, "Specific offsets must not be null.");
+		this.startupTimestampMillis = startupTimestampMillis;
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType        Source output data type
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		this(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			StartupMode.GROUP_OFFSETS,
+			Collections.emptyMap(),
+			DEFAULT_STARTUP_TIMESTAMP_MILLIS);
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return this.scanFormat.getChangelogMode();
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		DeserializationSchema<RowData> deserializationSchema =
+				getDeserializationSchema(runtimeProviderContext);
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
+				getKafkaConsumer(topic, properties, deserializationSchema);
+		return SourceFunctionProvider.of(kafkaConsumer, true);
+	}
+
+	/**
+	 * Returns the properties for the Kafka consumer.
+	 *
+	 * @return properties for the Kafka consumer.
+	 */
+	public Properties getProperties() {
+		return properties;
+	}
+
+	/**
+	 * Returns the deserialization schema.
+	 */
+	public DeserializationSchema<RowData> getDeserializationSchema(
+			Context runtimeProviderContext){
+		return this.scanFormat.createScanFormat(runtimeProviderContext, this.outputDataType);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final KafkaScanSourceBase that = (KafkaScanSourceBase) o;
+		return Objects.equals(outputDataType, that.outputDataType) &&
+			Objects.equals(topic, that.topic) &&
+			Objects.equals(properties, that.properties) &&
+			Objects.equals(scanFormat, that.scanFormat) &&
+			startupMode == that.startupMode &&
+			Objects.equals(specificStartupOffsets, that.specificStartupOffsets) &&
+			startupTimestampMillis == that.startupTimestampMillis;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			startupMode,
+			specificStartupOffsets,
+			startupTimestampMillis);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Abstract methods for subclasses
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a version-specific Kafka consumer.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<RowData> deserializationSchema);
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns a version-specific Kafka consumer with the start position configured.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected FlinkKafkaConsumerBase<RowData> getKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<RowData> deserializationSchema) {
+		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
+				createKafkaConsumer(topic, properties, deserializationSchema);
+		switch (startupMode) {
+		case EARLIEST:

Review comment:
       Indent.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();
+
+	// --------------------------------------------------------------------------------------------
+	// Sink specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
+			.key("sink.partitioner")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional output partitioning from Flink's partitions\n"
+					+ "into Kafka's partitions valid enumerations are\n"
+					+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+
+	// --------------------------------------------------------------------------------------------
+	// Option enumerations
+	// --------------------------------------------------------------------------------------------
+
+	// Start up offset.
+	public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+	private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+			SCAN_STARTUP_MODE_VALUE_EARLIEST,
+			SCAN_STARTUP_MODE_VALUE_LATEST,
+			SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+	// Sink partitioner.
+	public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+
+	private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
+			SINK_PARTITIONER_VALUE_FIXED,
+			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+	// Prefix for Kafka specific properties.
+	public static final String PROPERTIES = "properties";
+
+	// Other keywords.
+	private static final String PARTITION = "partition";
+	private static final String OFFSET = "offset";
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateScanStartupMode(tableOptions);
+		validateSinkPartitioner(tableOptions);
+	}
+
+	private static void validateScanStartupMode(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(String::toLowerCase)
+				.ifPresent(mode -> {
+					if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
+						throw new ValidationException(
+								String.format("Invalid value for option '%s'. Supported values are %s, but was: %s",
+										SCAN_STARTUP_MODE.key(),
+										"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
+										mode));
+					}
+
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+									SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+						}
+					}
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
+						}
+						String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+						parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+			}
+		});
+	}
+
+	private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SINK_PARTITIONER)
+				.ifPresent(partitioner -> {
+					if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
+						if (partitioner.isEmpty()) {
+							throw new ValidationException(
+									String.format("Option '%s' should be a non-empty string.",
+											SINK_PARTITIONER.key()));
+						}
+					}
+				});
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static StartupOptions getStartupOptions(
+			ReadableConfig tableOptions,
+			String topic) {
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(modeString -> {
+					switch (modeString) {
+					case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+						return StartupMode.EARLIEST;
+
+					case SCAN_STARTUP_MODE_VALUE_LATEST:
+						return StartupMode.LATEST;
+
+					case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+						return StartupMode.GROUP_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+						buildSpecificOffsets(tableOptions, topic, specificOffsets);
+						return StartupMode.SPECIFIC_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+						return StartupMode.TIMESTAMP;
+
+					default:
+						throw new TableException("Unsupported startup mode. Validator should have checked that.");
+					}
+				}).orElse(StartupMode.GROUP_OFFSETS);
+		final StartupOptions options = new StartupOptions();
+		options.startupMode = startupMode;
+		options.specificOffsets = specificOffsets;
+		if (startupMode == StartupMode.TIMESTAMP) {
+			options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		}
+		return options;
+	}
+
+	private static void buildSpecificOffsets(
+			ReadableConfig tableOptions,
+			String topic,
+			Map<KafkaTopicPartition, Long> specificOffsets) {
+		String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		final Map<Integer, Long> offsetMap = parseSpecificOffsets(
+				specificOffsetsStrOpt,
+				SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+		offsetMap.forEach((partition, offset) -> {
+			final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+			specificOffsets.put(topicPartition, offset);
+		});
+	}
+
+	public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+		final Properties kafkaProperties = new Properties();
+
+		if (hasKafkaClientProperties(tableOptions)) {
+			tableOptions.keySet().stream()
+					.filter(key -> key.startsWith(PROPERTIES + '.'))
+					.forEach(key -> {
+						final String value = tableOptions.get(key);
+						final String subKey = key.substring((PROPERTIES + '.').length());
+						kafkaProperties.put(subKey, value);
+					});
+		}
+		return kafkaProperties;
+	}
+
+	/**
+	 * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class name.
+	 */
+	@SuppressWarnings("unchecked")
+	public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(SINK_PARTITIONER)
+				.flatMap((String partitioner) -> {
+					switch (partitioner) {
+					case SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					// Default fallback to full class name of the partitioner.
+					default:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+								getPartitionerClass(partitioner);
+						return Optional.of((FlinkKafkaPartitioner<RowData>) InstantiationUtil.instantiate(partitionerClass));
+					}
+				});
+	}
+
+	/**
+	 * Parses SpecificOffsets String to Map.
+	 *
+	 * <p>SpecificOffsets String format was given as following:
+	 *
+	 * <pre>
+	 *     scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
+	 * </pre>
+	 *
+	 * @return SpecificOffsets with Map format, key is partition, and value is offset
+	 */
+	public static Map<Integer, Long> parseSpecificOffsets(
+			String specificOffsetsStr,
+			String optionKey) {
+		final Map<Integer, Long> offsetMap = new HashMap<>();
+		final String[] pairs = specificOffsetsStr.split(";");
+		final String validationExceptionMessage = String.format(
+				"Invalid properties '%s' should follow the format "
+						+ "'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+				optionKey,
+				specificOffsetsStr);
+
+		if (pairs.length == 0) {
+			throw new ValidationException(validationExceptionMessage);
+		}
+
+		for (String pair : pairs) {
+			if (null == pair || pair.length() == 0 || !pair.contains(",")) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			final String[] kv = pair.split(",");
+			if (kv.length != 2 ||
+					!kv[0].startsWith(PARTITION + ':') ||
+					!kv[1].startsWith(OFFSET + ':')) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+			String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+			try {
+				final Integer partition = Integer.valueOf(partitionValue);
+				final Long offset = Long.valueOf(offsetValue);
+				offsetMap.put(partition, offset);
+			} catch (NumberFormatException e) {
+				throw new ValidationException(validationExceptionMessage, e);
+			}
+		}
+		return offsetMap;
+	}
+
+	/** Decides if the table options contains Kafka client properties that start with prefix 'properties'. */
+	private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
+		return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES));
+	}
+
+	/**
+	 * Returns a class value with the given class name.
+	 */
+	@SuppressWarnings("unchecked")
+	private static <T> Class<T> getPartitionerClass(String name) {
+		final Class<?> clazz;
+		try {
+			clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader());
+			if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+				throw new ValidationException(
+						String.format("Sink partitioner class '%s' should extend from the required class %s",
+								name,
+								FlinkKafkaPartitioner.class.getName()));
+			}
+			return (Class<T>) clazz;
+		} catch (Exception e) {
+			throw new ValidationException("Could not get class '" + name, e);

Review comment:
       ```suggestion
   			throw new ValidationException("Could not find partitioner class '" + name + "'.", e);
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();
+
+	// --------------------------------------------------------------------------------------------
+	// Sink specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
+			.key("sink.partitioner")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional output partitioning from Flink's partitions\n"
+					+ "into Kafka's partitions valid enumerations are\n"
+					+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+
+	// --------------------------------------------------------------------------------------------
+	// Option enumerations
+	// --------------------------------------------------------------------------------------------
+
+	// Start up offset.
+	public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+	private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+			SCAN_STARTUP_MODE_VALUE_EARLIEST,
+			SCAN_STARTUP_MODE_VALUE_LATEST,
+			SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+	// Sink partitioner.
+	public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+
+	private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
+			SINK_PARTITIONER_VALUE_FIXED,
+			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+	// Prefix for Kafka specific properties.
+	public static final String PROPERTIES = "properties";
+
+	// Other keywords.
+	private static final String PARTITION = "partition";
+	private static final String OFFSET = "offset";
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateScanStartupMode(tableOptions);
+		validateSinkPartitioner(tableOptions);
+	}
+
+	private static void validateScanStartupMode(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(String::toLowerCase)
+				.ifPresent(mode -> {
+					if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
+						throw new ValidationException(
+								String.format("Invalid value for option '%s'. Supported values are %s, but was: %s",
+										SCAN_STARTUP_MODE.key(),
+										"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
+										mode));
+					}
+
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+									SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+						}
+					}
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
+						}
+						String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+						parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+			}
+		});
+	}
+
+	private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SINK_PARTITIONER)
+				.ifPresent(partitioner -> {
+					if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
+						if (partitioner.isEmpty()) {
+							throw new ValidationException(
+									String.format("Option '%s' should be a non-empty string.",
+											SINK_PARTITIONER.key()));
+						}
+					}
+				});
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static StartupOptions getStartupOptions(
+			ReadableConfig tableOptions,
+			String topic) {
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(modeString -> {
+					switch (modeString) {
+					case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+						return StartupMode.EARLIEST;
+
+					case SCAN_STARTUP_MODE_VALUE_LATEST:
+						return StartupMode.LATEST;
+
+					case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+						return StartupMode.GROUP_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+						buildSpecificOffsets(tableOptions, topic, specificOffsets);
+						return StartupMode.SPECIFIC_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+						return StartupMode.TIMESTAMP;
+
+					default:
+						throw new TableException("Unsupported startup mode. Validator should have checked that.");
+					}
+				}).orElse(StartupMode.GROUP_OFFSETS);
+		final StartupOptions options = new StartupOptions();
+		options.startupMode = startupMode;
+		options.specificOffsets = specificOffsets;
+		if (startupMode == StartupMode.TIMESTAMP) {
+			options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		}
+		return options;
+	}
+
+	private static void buildSpecificOffsets(
+			ReadableConfig tableOptions,
+			String topic,
+			Map<KafkaTopicPartition, Long> specificOffsets) {
+		String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		final Map<Integer, Long> offsetMap = parseSpecificOffsets(
+				specificOffsetsStrOpt,
+				SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+		offsetMap.forEach((partition, offset) -> {
+			final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+			specificOffsets.put(topicPartition, offset);
+		});
+	}
+
+	public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+		final Properties kafkaProperties = new Properties();
+
+		if (hasKafkaClientProperties(tableOptions)) {
+			tableOptions.keySet().stream()
+					.filter(key -> key.startsWith(PROPERTIES + '.'))
+					.forEach(key -> {
+						final String value = tableOptions.get(key);
+						final String subKey = key.substring((PROPERTIES + '.').length());
+						kafkaProperties.put(subKey, value);
+					});
+		}
+		return kafkaProperties;
+	}
+
+	/**
+	 * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class name.
+	 */
+	@SuppressWarnings("unchecked")
+	public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(SINK_PARTITIONER)
+				.flatMap((String partitioner) -> {
+					switch (partitioner) {
+					case SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					// Default fallback to full class name of the partitioner.
+					default:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+								getPartitionerClass(partitioner);
+						return Optional.of((FlinkKafkaPartitioner<RowData>) InstantiationUtil.instantiate(partitionerClass));
+					}
+				});
+	}
+
+	/**
+	 * Parses SpecificOffsets String to Map.
+	 *
+	 * <p>SpecificOffsets String format was given as following:
+	 *
+	 * <pre>
+	 *     scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
+	 * </pre>
+	 *
+	 * @return SpecificOffsets with Map format, key is partition, and value is offset
+	 */
+	public static Map<Integer, Long> parseSpecificOffsets(
+			String specificOffsetsStr,
+			String optionKey) {
+		final Map<Integer, Long> offsetMap = new HashMap<>();
+		final String[] pairs = specificOffsetsStr.split(";");
+		final String validationExceptionMessage = String.format(
+				"Invalid properties '%s' should follow the format "
+						+ "'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+				optionKey,
+				specificOffsetsStr);
+
+		if (pairs.length == 0) {
+			throw new ValidationException(validationExceptionMessage);
+		}
+
+		for (String pair : pairs) {
+			if (null == pair || pair.length() == 0 || !pair.contains(",")) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			final String[] kv = pair.split(",");
+			if (kv.length != 2 ||
+					!kv[0].startsWith(PARTITION + ':') ||
+					!kv[1].startsWith(OFFSET + ':')) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+			String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+			try {
+				final Integer partition = Integer.valueOf(partitionValue);
+				final Long offset = Long.valueOf(offsetValue);
+				offsetMap.put(partition, offset);
+			} catch (NumberFormatException e) {
+				throw new ValidationException(validationExceptionMessage, e);
+			}
+		}
+		return offsetMap;
+	}
+
+	/** Decides if the table options contains Kafka client properties that start with prefix 'properties'. */
+	private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
+		return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES));
+	}
+
+	/**
+	 * Returns a class value with the given class name.
+	 */
+	@SuppressWarnings("unchecked")
+	private static <T> Class<T> getPartitionerClass(String name) {
+		final Class<?> clazz;
+		try {
+			clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader());
+			if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+				throw new ValidationException(
+						String.format("Sink partitioner class '%s' should extend from the required class %s",
+								name,
+								FlinkKafkaPartitioner.class.getName()));
+			}
+			return (Class<T>) clazz;
+		} catch (Exception e) {

Review comment:
       Should only catch on `Class.forName`. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSourceSinkFactoryTestBase.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Abstract test base for {@link KafkaSourceSinkFactoryBase}.
+ */
+public abstract class KafkaSourceSinkFactoryTestBase extends TestLogger {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	private static final String TOPIC = "myTopic";
+	private static final int PARTITION_0 = 0;
+	private static final long OFFSET_0 = 100L;
+	private static final int PARTITION_1 = 1;
+	private static final long OFFSET_1 = 123L;
+	private static final String NAME = "name";
+	private static final String COUNT = "count";
+	private static final String TIME = "time";
+	private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL '5' SECOND";
+	private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3);
+	private static final String COMPUTED_COLUMN_NAME = "computed-column";
+	private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
+	private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
+
+	private static final Properties KAFKA_PROPERTIES = new Properties();
+	static {
+		KAFKA_PROPERTIES.setProperty("group.id", "dummy");
+		KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+		KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");

Review comment:
       it's not necessary now.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSourceBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link ScanTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+@Internal
+public abstract class KafkaScanSourceBase implements ScanTableSource {
+
+	// --------------------------------------------------------------------------------------------
+	// Common attributes
+	// --------------------------------------------------------------------------------------------
+	protected final DataType outputDataType;
+
+	// --------------------------------------------------------------------------------------------
+	// Scan format attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** Scan format for decoding records from Kafka. */
+	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka-specific attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** The Kafka topic to consume. */
+	protected final String topic;
+
+	/** Properties for the Kafka consumer. */
+	protected final Properties properties;
+
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	protected final StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+	/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
+	protected final long startupTimestampMillis;
+
+	/** The default value when startup timestamp is not used.*/
+	private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source produced data type
+	 * @param topic                  Kafka topic to consume.
+	 * @param properties             Properties for the Kafka consumer.
+	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param startupMode            Startup mode for the contained consumer.
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+		this.outputDataType = Preconditions.checkNotNull(
+				outputDataType, "Produced data type must not be null.");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.scanFormat = Preconditions.checkNotNull(
+				scanFormat, "Scan format must not be null.");
+		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+		this.specificStartupOffsets = Preconditions.checkNotNull(
+			specificStartupOffsets, "Specific offsets must not be null.");
+		this.startupTimestampMillis = startupTimestampMillis;
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType        Source output data type
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		this(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			StartupMode.GROUP_OFFSETS,
+			Collections.emptyMap(),
+			DEFAULT_STARTUP_TIMESTAMP_MILLIS);
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return this.scanFormat.getChangelogMode();
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		DeserializationSchema<RowData> deserializationSchema =
+				getDeserializationSchema(runtimeProviderContext);
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
+				getKafkaConsumer(topic, properties, deserializationSchema);
+		return SourceFunctionProvider.of(kafkaConsumer, true);
+	}
+
+	/**
+	 * Returns the properties for the Kafka consumer.
+	 *
+	 * @return properties for the Kafka consumer.
+	 */
+	public Properties getProperties() {
+		return properties;
+	}
+
+	/**
+	 * Returns the deserialization schema.
+	 */
+	public DeserializationSchema<RowData> getDeserializationSchema(
+			Context runtimeProviderContext){
+		return this.scanFormat.createScanFormat(runtimeProviderContext, this.outputDataType);
+	}

Review comment:
       We don't need to extract a method for just a single line. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSourceBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link ScanTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+@Internal
+public abstract class KafkaScanSourceBase implements ScanTableSource {
+
+	// --------------------------------------------------------------------------------------------
+	// Common attributes
+	// --------------------------------------------------------------------------------------------
+	protected final DataType outputDataType;
+
+	// --------------------------------------------------------------------------------------------
+	// Scan format attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** Scan format for decoding records from Kafka. */
+	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka-specific attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** The Kafka topic to consume. */
+	protected final String topic;
+
+	/** Properties for the Kafka consumer. */
+	protected final Properties properties;
+
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	protected final StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+	/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
+	protected final long startupTimestampMillis;
+
+	/** The default value when startup timestamp is not used.*/
+	private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source produced data type
+	 * @param topic                  Kafka topic to consume.
+	 * @param properties             Properties for the Kafka consumer.
+	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param startupMode            Startup mode for the contained consumer.
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+		this.outputDataType = Preconditions.checkNotNull(
+				outputDataType, "Produced data type must not be null.");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.scanFormat = Preconditions.checkNotNull(
+				scanFormat, "Scan format must not be null.");
+		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+		this.specificStartupOffsets = Preconditions.checkNotNull(
+			specificStartupOffsets, "Specific offsets must not be null.");
+		this.startupTimestampMillis = startupTimestampMillis;
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType        Source output data type
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 */
+	protected KafkaScanSourceBase(

Review comment:
       We don't need this constructor.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();
+
+	// --------------------------------------------------------------------------------------------
+	// Sink specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
+			.key("sink.partitioner")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional output partitioning from Flink's partitions\n"
+					+ "into Kafka's partitions valid enumerations are\n"
+					+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+
+	// --------------------------------------------------------------------------------------------
+	// Option enumerations
+	// --------------------------------------------------------------------------------------------
+
+	// Start up offset.
+	public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+	private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+			SCAN_STARTUP_MODE_VALUE_EARLIEST,
+			SCAN_STARTUP_MODE_VALUE_LATEST,
+			SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+	// Sink partitioner.
+	public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+
+	private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
+			SINK_PARTITIONER_VALUE_FIXED,
+			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+	// Prefix for Kafka specific properties.
+	public static final String PROPERTIES = "properties";
+
+	// Other keywords.
+	private static final String PARTITION = "partition";
+	private static final String OFFSET = "offset";
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateScanStartupMode(tableOptions);
+		validateSinkPartitioner(tableOptions);
+	}
+
+	private static void validateScanStartupMode(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(String::toLowerCase)
+				.ifPresent(mode -> {
+					if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
+						throw new ValidationException(
+								String.format("Invalid value for option '%s'. Supported values are %s, but was: %s",
+										SCAN_STARTUP_MODE.key(),
+										"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
+										mode));
+					}
+
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+									SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+						}
+					}
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
+						}
+						String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+						parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+			}
+		});
+	}
+
+	private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SINK_PARTITIONER)
+				.ifPresent(partitioner -> {
+					if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
+						if (partitioner.isEmpty()) {
+							throw new ValidationException(
+									String.format("Option '%s' should be a non-empty string.",
+											SINK_PARTITIONER.key()));
+						}
+					}
+				});
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static StartupOptions getStartupOptions(
+			ReadableConfig tableOptions,
+			String topic) {
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(modeString -> {
+					switch (modeString) {
+					case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+						return StartupMode.EARLIEST;
+
+					case SCAN_STARTUP_MODE_VALUE_LATEST:
+						return StartupMode.LATEST;
+
+					case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+						return StartupMode.GROUP_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+						buildSpecificOffsets(tableOptions, topic, specificOffsets);
+						return StartupMode.SPECIFIC_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+						return StartupMode.TIMESTAMP;
+
+					default:
+						throw new TableException("Unsupported startup mode. Validator should have checked that.");
+					}
+				}).orElse(StartupMode.GROUP_OFFSETS);
+		final StartupOptions options = new StartupOptions();
+		options.startupMode = startupMode;
+		options.specificOffsets = specificOffsets;
+		if (startupMode == StartupMode.TIMESTAMP) {
+			options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		}
+		return options;
+	}
+
+	private static void buildSpecificOffsets(
+			ReadableConfig tableOptions,
+			String topic,
+			Map<KafkaTopicPartition, Long> specificOffsets) {
+		String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		final Map<Integer, Long> offsetMap = parseSpecificOffsets(
+				specificOffsetsStrOpt,
+				SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+		offsetMap.forEach((partition, offset) -> {
+			final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+			specificOffsets.put(topicPartition, offset);
+		});
+	}
+
+	public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+		final Properties kafkaProperties = new Properties();
+
+		if (hasKafkaClientProperties(tableOptions)) {
+			tableOptions.keySet().stream()
+					.filter(key -> key.startsWith(PROPERTIES + '.'))
+					.forEach(key -> {
+						final String value = tableOptions.get(key);
+						final String subKey = key.substring((PROPERTIES + '.').length());
+						kafkaProperties.put(subKey, value);
+					});
+		}
+		return kafkaProperties;
+	}
+
+	/**
+	 * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class name.
+	 */
+	@SuppressWarnings("unchecked")
+	public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(SINK_PARTITIONER)
+				.flatMap((String partitioner) -> {
+					switch (partitioner) {
+					case SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					// Default fallback to full class name of the partitioner.
+					default:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+								getPartitionerClass(partitioner);
+						return Optional.of((FlinkKafkaPartitioner<RowData>) InstantiationUtil.instantiate(partitionerClass));
+					}
+				});
+	}
+
+	/**
+	 * Parses SpecificOffsets String to Map.
+	 *
+	 * <p>SpecificOffsets String format was given as following:
+	 *
+	 * <pre>
+	 *     scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
+	 * </pre>
+	 *
+	 * @return SpecificOffsets with Map format, key is partition, and value is offset
+	 */
+	public static Map<Integer, Long> parseSpecificOffsets(
+			String specificOffsetsStr,
+			String optionKey) {
+		final Map<Integer, Long> offsetMap = new HashMap<>();
+		final String[] pairs = specificOffsetsStr.split(";");
+		final String validationExceptionMessage = String.format(
+				"Invalid properties '%s' should follow the format "
+						+ "'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+				optionKey,
+				specificOffsetsStr);
+
+		if (pairs.length == 0) {
+			throw new ValidationException(validationExceptionMessage);
+		}
+
+		for (String pair : pairs) {
+			if (null == pair || pair.length() == 0 || !pair.contains(",")) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			final String[] kv = pair.split(",");
+			if (kv.length != 2 ||
+					!kv[0].startsWith(PARTITION + ':') ||
+					!kv[1].startsWith(OFFSET + ':')) {
+				throw new ValidationException(validationExceptionMessage);
+			}
+
+			String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+			String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+			try {
+				final Integer partition = Integer.valueOf(partitionValue);
+				final Long offset = Long.valueOf(offsetValue);
+				offsetMap.put(partition, offset);
+			} catch (NumberFormatException e) {
+				throw new ValidationException(validationExceptionMessage, e);
+			}
+		}
+		return offsetMap;
+	}
+
+	/** Decides if the table options contains Kafka client properties that start with prefix 'properties'. */
+	private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
+		return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES));
+	}
+
+	/**
+	 * Returns a class value with the given class name.
+	 */
+	@SuppressWarnings("unchecked")
+	private static <T> Class<T> getPartitionerClass(String name) {
+		final Class<?> clazz;
+		try {
+			clazz = Class.forName(name, true, Thread.currentThread().getContextClassLoader());

Review comment:
       Use the classloader from `DynamicTableFactory.Context#getClassLoader`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSourceBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link ScanTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+@Internal
+public abstract class KafkaScanSourceBase implements ScanTableSource {
+
+	// --------------------------------------------------------------------------------------------
+	// Common attributes
+	// --------------------------------------------------------------------------------------------
+	protected final DataType outputDataType;
+
+	// --------------------------------------------------------------------------------------------
+	// Scan format attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** Scan format for decoding records from Kafka. */
+	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka-specific attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** The Kafka topic to consume. */
+	protected final String topic;
+
+	/** Properties for the Kafka consumer. */
+	protected final Properties properties;
+
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	protected final StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+	/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
+	protected final long startupTimestampMillis;
+
+	/** The default value when startup timestamp is not used.*/
+	private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source produced data type
+	 * @param topic                  Kafka topic to consume.
+	 * @param properties             Properties for the Kafka consumer.
+	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param startupMode            Startup mode for the contained consumer.
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+		this.outputDataType = Preconditions.checkNotNull(
+				outputDataType, "Produced data type must not be null.");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.scanFormat = Preconditions.checkNotNull(
+				scanFormat, "Scan format must not be null.");
+		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+		this.specificStartupOffsets = Preconditions.checkNotNull(
+			specificStartupOffsets, "Specific offsets must not be null.");
+		this.startupTimestampMillis = startupTimestampMillis;
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType        Source output data type
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		this(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			StartupMode.GROUP_OFFSETS,
+			Collections.emptyMap(),
+			DEFAULT_STARTUP_TIMESTAMP_MILLIS);
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return this.scanFormat.getChangelogMode();
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		DeserializationSchema<RowData> deserializationSchema =
+				getDeserializationSchema(runtimeProviderContext);
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
+				getKafkaConsumer(topic, properties, deserializationSchema);
+		return SourceFunctionProvider.of(kafkaConsumer, true);

Review comment:
       Should be `false`, it's unbounded. 

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSourceSinkFactoryBase.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_ZK_CONNECT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableOptions;
+
+/**
+ * Factory for creating configured instances of {@link KafkaScanSourceBase}.

Review comment:
       ```suggestion
    * Factory for creating configured instances of {@link KafkaScanSourceBase} and {@link KafkaSinkBase}.
   ```

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSourceBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/**
+ * A version-agnostic Kafka {@link ScanTableSource}.
+ *
+ * <p>The version-specific Kafka consumers need to extend this class and
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ */
+@Internal
+public abstract class KafkaScanSourceBase implements ScanTableSource {
+
+	// --------------------------------------------------------------------------------------------
+	// Common attributes
+	// --------------------------------------------------------------------------------------------
+	protected final DataType outputDataType;
+
+	// --------------------------------------------------------------------------------------------
+	// Scan format attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** Scan format for decoding records from Kafka. */
+	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka-specific attributes
+	// --------------------------------------------------------------------------------------------
+
+	/** The Kafka topic to consume. */
+	protected final String topic;
+
+	/** Properties for the Kafka consumer. */
+	protected final Properties properties;
+
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
+	protected final StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
+	protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+	/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
+	protected final long startupTimestampMillis;
+
+	/** The default value when startup timestamp is not used.*/
+	private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source produced data type
+	 * @param topic                  Kafka topic to consume.
+	 * @param properties             Properties for the Kafka consumer.
+	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param startupMode            Startup mode for the contained consumer.
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+		this.outputDataType = Preconditions.checkNotNull(
+				outputDataType, "Produced data type must not be null.");
+		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
+		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+		this.scanFormat = Preconditions.checkNotNull(
+				scanFormat, "Scan format must not be null.");
+		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+		this.specificStartupOffsets = Preconditions.checkNotNull(
+			specificStartupOffsets, "Specific offsets must not be null.");
+		this.startupTimestampMillis = startupTimestampMillis;
+	}
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType        Source output data type
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 */
+	protected KafkaScanSourceBase(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		this(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			StartupMode.GROUP_OFFSETS,
+			Collections.emptyMap(),
+			DEFAULT_STARTUP_TIMESTAMP_MILLIS);
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return this.scanFormat.getChangelogMode();
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		DeserializationSchema<RowData> deserializationSchema =
+				getDeserializationSchema(runtimeProviderContext);
+		// Version-specific Kafka consumer
+		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
+				getKafkaConsumer(topic, properties, deserializationSchema);
+		return SourceFunctionProvider.of(kafkaConsumer, true);
+	}
+
+	/**
+	 * Returns the properties for the Kafka consumer.
+	 *
+	 * @return properties for the Kafka consumer.
+	 */
+	public Properties getProperties() {

Review comment:
       Not used.

##########
File path: flink-connectors/flink-connector-kafka-0.10/pom.xml
##########
@@ -171,6 +171,23 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka table factory testing -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>

Review comment:
       Why do we need this?

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
##########
@@ -412,7 +412,7 @@ static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offse
 	// This method is implemented in the base class because this is where the startup logging and verifications live.
 	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
 	// Version-specific subclasses which can expose the functionality should override and allow public access.
-	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
+	public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {

Review comment:
       As the above comment said, we can't change the modifier to `public`. Maybe we need to extract an abstract method to set startup timestamp in `KafkaScanSourceBase`.

##########
File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+	private KafkaOptions() {}
+
+	// --------------------------------------------------------------------------------------------
+	// Kafka specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> TOPIC = ConfigOptions
+			.key("topic")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required topic name from which the table is read");
+
+	public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
+			.key("properties.bootstrap.servers")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required Kafka server connection string");
+
+	public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
+			.key("properties.group.id")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
+
+	public static final ConfigOption<String> PROPS_ZK_CONNECT = ConfigOptions
+			.key("properties.zookeeper.connect")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional ZooKeeper connection string");
+
+	// --------------------------------------------------------------------------------------------
+	// Scan specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
+			.key("scan.startup-mode")
+			.stringType()
+			.defaultValue("group-offsets")
+			.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+					+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\"\n"
+					+ "or \"specific-offsets\"");
+
+	public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
+			.key("scan.startup.specific-offsets")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
+
+	public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
+			.key("scan.startup.timestamp-millis")
+			.longType()
+			.noDefaultValue();
+
+	// --------------------------------------------------------------------------------------------
+	// Sink specific options
+	// --------------------------------------------------------------------------------------------
+
+	public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
+			.key("sink.partitioner")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("Optional output partitioning from Flink's partitions\n"
+					+ "into Kafka's partitions valid enumerations are\n"
+					+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+
+	// --------------------------------------------------------------------------------------------
+	// Option enumerations
+	// --------------------------------------------------------------------------------------------
+
+	// Start up offset.
+	public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+	public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
+
+	private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
+			SCAN_STARTUP_MODE_VALUE_EARLIEST,
+			SCAN_STARTUP_MODE_VALUE_LATEST,
+			SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+			SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+
+	// Sink partitioner.
+	public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
+	public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
+
+	private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
+			SINK_PARTITIONER_VALUE_FIXED,
+			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+
+	// Prefix for Kafka specific properties.
+	public static final String PROPERTIES = "properties";
+
+	// Other keywords.
+	private static final String PARTITION = "partition";
+	private static final String OFFSET = "offset";
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	public static void validateTableOptions(ReadableConfig tableOptions) {
+		validateScanStartupMode(tableOptions);
+		validateSinkPartitioner(tableOptions);
+	}
+
+	private static void validateScanStartupMode(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(String::toLowerCase)
+				.ifPresent(mode -> {
+					if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
+						throw new ValidationException(
+								String.format("Invalid value for option '%s'. Supported values are %s, but was: %s",
+										SCAN_STARTUP_MODE.key(),
+										"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
+										mode));
+					}
+
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+									SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
+						}
+					}
+					if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
+						if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
+							throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+											+ " but missing.",
+									SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
+									SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
+						}
+						String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+						parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+			}
+		});
+	}
+
+	private static void validateSinkPartitioner(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SINK_PARTITIONER)
+				.ifPresent(partitioner -> {
+					if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
+						if (partitioner.isEmpty()) {
+							throw new ValidationException(
+									String.format("Option '%s' should be a non-empty string.",
+											SINK_PARTITIONER.key()));
+						}
+					}
+				});
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static StartupOptions getStartupOptions(
+			ReadableConfig tableOptions,
+			String topic) {
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
+				.map(modeString -> {
+					switch (modeString) {
+					case SCAN_STARTUP_MODE_VALUE_EARLIEST:
+						return StartupMode.EARLIEST;
+
+					case SCAN_STARTUP_MODE_VALUE_LATEST:
+						return StartupMode.LATEST;
+
+					case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
+						return StartupMode.GROUP_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+						buildSpecificOffsets(tableOptions, topic, specificOffsets);
+						return StartupMode.SPECIFIC_OFFSETS;
+
+					case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
+						return StartupMode.TIMESTAMP;
+
+					default:
+						throw new TableException("Unsupported startup mode. Validator should have checked that.");
+					}
+				}).orElse(StartupMode.GROUP_OFFSETS);
+		final StartupOptions options = new StartupOptions();
+		options.startupMode = startupMode;
+		options.specificOffsets = specificOffsets;
+		if (startupMode == StartupMode.TIMESTAMP) {
+			options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+		}
+		return options;
+	}
+
+	private static void buildSpecificOffsets(
+			ReadableConfig tableOptions,
+			String topic,
+			Map<KafkaTopicPartition, Long> specificOffsets) {
+		String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
+		final Map<Integer, Long> offsetMap = parseSpecificOffsets(
+				specificOffsetsStrOpt,
+				SCAN_STARTUP_SPECIFIC_OFFSETS.key());
+		offsetMap.forEach((partition, offset) -> {
+			final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
+			specificOffsets.put(topicPartition, offset);
+		});
+	}
+
+	public static Properties getKafkaProperties(Map<String, String> tableOptions) {
+		final Properties kafkaProperties = new Properties();
+
+		if (hasKafkaClientProperties(tableOptions)) {
+			tableOptions.keySet().stream()
+					.filter(key -> key.startsWith(PROPERTIES + '.'))
+					.forEach(key -> {
+						final String value = tableOptions.get(key);
+						final String subKey = key.substring((PROPERTIES + '.').length());
+						kafkaProperties.put(subKey, value);
+					});
+		}
+		return kafkaProperties;
+	}
+
+	/**
+	 * The partitioner can be either "fixed", "round-robin" or a customized partitioner full class name.
+	 */
+	@SuppressWarnings("unchecked")
+	public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions) {
+		return tableOptions.getOptional(SINK_PARTITIONER)
+				.flatMap((String partitioner) -> {
+					switch (partitioner) {
+					case SINK_PARTITIONER_VALUE_FIXED:
+						return Optional.of(new FlinkFixedPartitioner<>());
+					case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+						return Optional.empty();
+					// Default fallback to full class name of the partitioner.
+					default:
+						final Class<? extends FlinkKafkaPartitioner> partitionerClass =
+								getPartitionerClass(partitioner);
+						return Optional.of((FlinkKafkaPartitioner<RowData>) InstantiationUtil.instantiate(partitionerClass));

Review comment:
       Use classloader from `DynamicTableFactory#Context#getClassloader()`. 
   Maybe we can directly call `InstantiationUtil#instantiate(className,  targetType, classLoader)`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -477,24 +477,40 @@ private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory
 		 * keys.
 		 */
 		public void validate() {
+			validateExcept();
+		}
+
+		/**
+		 * Validates the options of the {@link DynamicTableFactory}. It checks for unconsumed option
+		 * keys while ignoring the options with given prefixes.
+		 *
+		 * <p>The option keys that have given prefix {@code prefixToSkip}
+		 * would just be skipped for validation.
+		 *
+		 * @param prefixesToSkip Set of option key prefixes to skip validation
+		 */
+		public void validateExcept(String... prefixesToSkip) {

Review comment:
       What about call it `validateExceptPrefix(String prefix)`? 
   I don't like varargs prefix, otherwise, users can write `validateExcept` which is very confusing. 
   We can still keep the logic in `validate()`, just add options which prefix with `prefix` into `consumedOptionKeys` and call `validate()` in this method. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaScanSource.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Kafka {@link org.apache.flink.table.connector.source.DynamicTableSource}.
+ */
+@Internal
+public class KafkaScanSource extends KafkaScanSourceBase {
+
+	/**
+	 * Creates a generic Kafka {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source output data type
+	 * @param topic                  Kafka topic to consume
+	 * @param properties             Properties for the Kafka consumer
+	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param startupMode            Startup mode for the contained consumer
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
+	 */
+	public KafkaScanSource(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+
+		super(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			startupMode,
+			specificStartupOffsets,
+			startupTimestampMillis);
+	}
+
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param outputDataType Source output data type
+	 * @param topic          Kafka topic to consume
+	 * @param properties     Properties for the Kafka consumer
+	 * @param scanFormat     Scan format for decoding records from Kafka
+	 */
+	public KafkaScanSource(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		super(outputDataType, topic, properties, scanFormat);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<RowData> deserializationSchema) {
+		return new FlinkKafkaConsumer<>(topic, deserializationSchema, properties);
+	}
+
+	@Override
+	public DynamicTableSource copy() {
+		return new KafkaScanSource(
+				this.outputDataType,
+				this.topic,
+				this.properties,
+				this.scanFormat,
+				this.startupMode,
+				this.specificStartupOffsets,
+				this.startupTimestampMillis);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "Kafka universal scan source";

Review comment:
       Just "Kafka".

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableTestBase.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Basic Tests for Kafka connector for Table API & SQL.
+ */
+public abstract class KafkaDynamicTableTestBase extends KafkaTestBase {

Review comment:
       Could you reuse the test `KafkaTableTestBase` by using `Parameterized`? 

##########
File path: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010ScanSource.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.10.
+ */
+@Internal
+public class Kafka010ScanSource extends KafkaScanSourceBase {
+
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param outputDataType         Source output data type
+	 * @param topic                  Kafka topic to consume
+	 * @param properties             Properties for the Kafka consumer
+	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param startupMode            Startup mode for the contained consumer
+	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
+	 * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
+	 *                               mode is {@link StartupMode#TIMESTAMP}
+	 */
+	public Kafka010ScanSource(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			StartupMode startupMode,
+			Map<KafkaTopicPartition, Long> specificStartupOffsets,
+			long startupTimestampMillis) {
+
+		super(
+			outputDataType,
+			topic,
+			properties,
+			scanFormat,
+			startupMode,
+			specificStartupOffsets,
+			startupTimestampMillis);
+	}
+
+	/**
+	 * Creates a Kafka 0.10 {@link StreamTableSource}.
+	 *
+	 * @param outputDataType Source output data type
+	 * @param topic          Kafka topic to consume
+	 * @param properties     Properties for the Kafka consumer
+	 * @param scanFormat     Scan format for decoding records from Kafka.
+	 */
+	public Kafka010ScanSource(
+			DataType outputDataType,
+			String topic,
+			Properties properties,
+			ScanFormat<DeserializationSchema<RowData>> scanFormat) {
+		super(outputDataType, topic, properties, scanFormat);
+	}
+
+	@Override
+	protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<RowData> deserializationSchema) {
+		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
+	}
+
+	@Override
+	public DynamicTableSource copy() {
+		return new Kafka010ScanSource(
+				this.outputDataType,
+				this.topic,
+				this.properties,
+				this.scanFormat,
+				this.startupMode,
+				this.specificStartupOffsets,
+				this.startupTimestampMillis);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "Kafka 0.10 scan source";

Review comment:
       Just "Kafka-0.10". The same to others.

##########
File path: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSourceSinkFactoryTestBase.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Abstract test base for {@link KafkaSourceSinkFactoryBase}.
+ */
+public abstract class KafkaSourceSinkFactoryTestBase extends TestLogger {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	private static final String TOPIC = "myTopic";
+	private static final int PARTITION_0 = 0;
+	private static final long OFFSET_0 = 100L;
+	private static final int PARTITION_1 = 1;
+	private static final long OFFSET_1 = 123L;
+	private static final String NAME = "name";
+	private static final String COUNT = "count";
+	private static final String TIME = "time";
+	private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL '5' SECOND";
+	private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3);
+	private static final String COMPUTED_COLUMN_NAME = "computed-column";
+	private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
+	private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
+
+	private static final Properties KAFKA_PROPERTIES = new Properties();
+	static {
+		KAFKA_PROPERTIES.setProperty("group.id", "dummy");
+		KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+		KAFKA_PROPERTIES.setProperty("zookeeper.connect", "dummy");
+	}
+
+	private static final String PROPS_SCAN_OFFSETS =
+			String.format("partition:%d,offset:%d;partition:%d,offset:%d",
+					PARTITION_0, OFFSET_0, PARTITION_1, OFFSET_1);
+
+	private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+			.field(NAME, DataTypes.STRING())
+			.field(COUNT, DataTypes.DECIMAL(38, 18))
+			.field(TIME, DataTypes.TIMESTAMP(3))
+			.field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION)
+				.watermark(TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
+				.build();
+
+	private static final TableSchema SINK_SCHEMA = TableSchema.builder()
+			.field(NAME, DataTypes.STRING())
+			.field(COUNT, DataTypes.DECIMAL(38, 18))
+			.field(TIME, DataTypes.TIMESTAMP(3))
+			.build();
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTableSource() {
+		// prepare parameters for Kafka table source
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
+		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
+
+		ScanFormat<DeserializationSchema<RowData>> scanFormat =
+				new TestFormatFactory.ScanFormatMock(",", true);
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"scanTable");
+		CatalogTable catalogTable = createKafkaSourceCatalogTable();
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader());
+
+		// Test scan source equals
+		final KafkaScanSourceBase actualKafkaSource = (KafkaScanSourceBase) actualSource;
+		assertThat(actualKafkaSource, instanceOf(getExpectedScanSourceClass()));
+		assertEquals(producedDataType, actualKafkaSource.outputDataType);
+		assertEquals(TOPIC, actualKafkaSource.topic);
+		assertEquals(KAFKA_PROPERTIES, actualKafkaSource.properties);
+		assertEquals(specificOffsets, actualKafkaSource.specificStartupOffsets);
+		assertEquals(scanFormat, actualKafkaSource.scanFormat);
+
+		// Test Kafka consumer
+		ScanTableSource.ScanRuntimeProvider provider =
+				actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(provider, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
+		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
+		assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
+	}
+
+	@Test
+	public void testTableSink() {
+		final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType();
+		SinkFormat<SerializationSchema<RowData>> sinkFormat =
+				new TestFormatFactory.SinkFormatMock(",");
+
+		// Construct table sink using options and table sink factory.
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sinkTable");
+		final CatalogTable sinkTable = createKafkaSinkCatalogTable();
+		final DynamicTableSink actualSink = FactoryUtil.createTableSink(
+				null,
+				objectIdentifier,
+				sinkTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader());
+
+		assertThat(actualSink, instanceOf(getExpectedSinkClass()));
+
+		assertThat(actualSink, instanceOf(KafkaSinkBase.class));
+		final KafkaSinkBase kafkaSink = (KafkaSinkBase) actualSink;
+		assertEquals(consumedDataType, kafkaSink.consumedDataType);
+		assertEquals(TOPIC, kafkaSink.topic);
+		assertEquals(KAFKA_PROPERTIES, kafkaSink.properties);
+		assertThat(kafkaSink.partitioner.isPresent(), is(true));
+		assertThat(kafkaSink.partitioner.get(), instanceOf(FlinkFixedPartitioner.class));
+
+		// Test sink format.
+		final KafkaSinkBase actualKafkaSink = (KafkaSinkBase) actualSink;
+		assertEquals(sinkFormat, actualKafkaSink.sinkFormat);

Review comment:
       Could we assert the KafakSink object ? Just like what `KafkaTableSourceSinkFactoryTestBase` do?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org