You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 09:57:46 UTC

[GitHub] [flink] wuchong commented on a change in pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

wuchong commented on a change in pull request #13850:
URL: https://github.com/apache/flink/pull/13850#discussion_r514910862



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);

Review comment:
       If we are introducing a new connector, I prefer to not support this option in the first version. It is not very useful and may be error-prone in upsert mode. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as sink only supports insert-only encoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validatePKConstraints(TableSchema schema) {
+		if (!schema.getPrimaryKey().isPresent()) {
+			throw new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.");

Review comment:
       ```suggestion
   			throw new ValidationException(
   				"'upsert-kafka' tables require to define a PRIMARY KEY constraint. " +
   					"The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " +
   					"The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.");
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}

Review comment:
       ```suggestion
   		if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
   			String identifier = tableOptions.get(FactoryUtil.KEY_FORMAT);
   			throw new ValidationException(String.format(
   				"'upsert-kafka' connector doesn't support '%s' as key format, " +
   					"because '%s' is not in insert-only mode.",
   				identifier,
   				identifier));
   		}
   		if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
   			String identifier = tableOptions.get(FactoryUtil.VALUE_FORMAT);
   			throw new ValidationException(String.format(
   				"'upsert-kafka' connector doesn't support '%s' as value format, " +
   					"because '%s' is not in insert-only mode.",
   				identifier,
   				identifier));
   		}
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||

Review comment:
       keyFormat and valueFormat must both be `insertOnly`, you can use this to judge `keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
+		final LogicalType physicalType = physicalDataType.getLogicalType();
+		Preconditions.checkArgument(
+			hasRoot(physicalType, LogicalTypeRoot.ROW),
+			"Row data type expected.");
+		final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT);
+		final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS);
+
+		if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+			throw new ValidationException(
+					String.format(
+							"The option '%s' can only be declared if a key format is defined using '%s'.",
+							KEY_FIELDS.key(),
+							KEY_FORMAT.key()
+					)
+			);
+		} else if (optionalKeyFormat.isPresent() &&
+				(!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+			throw new ValidationException(
+					String.format(
+							"A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+							KEY_FORMAT.key(),
+							KEY_FIELDS.key()
+					)
+			);
+		}
+
+		if (!optionalKeyFormat.isPresent()) {
+			return new int[0];
+		}
+
+		final List<String> keyFields = optionalKeyFields.get();
+		final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType);
+		return keyFields.stream()
+				.mapToInt(keyField -> {
+					final int pos = physicalFields.indexOf(keyField);
+					if (pos < 0) {
+						throw new ValidationException(
+								String.format(
+										"Could not find the field '%s' in the table schema for usage in the key format. "
+												+ "A key field must be a regular, physical column. "
+												+ "The following columns can be selected in the '%s' option:\n"
+												+ "%s",
+										keyField,
+										KEY_FIELDS.key(),
+										physicalFields
+								)
+						);
+					}
+					return pos;
+				})
+			.toArray();

Review comment:
       We can extract this logic and can be reused for key projection from primary key information.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {

Review comment:
       I don't why Timo put these methods and options in a common `FactoryUtil`. IMO, they should belong to `KafkaOptions` which is a very Kafka specific feature.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
+		final LogicalType physicalType = physicalDataType.getLogicalType();
+		Preconditions.checkArgument(
+			hasRoot(physicalType, LogicalTypeRoot.ROW),
+			"Row data type expected.");
+		final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT);
+		final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS);
+
+		if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+			throw new ValidationException(
+					String.format(
+							"The option '%s' can only be declared if a key format is defined using '%s'.",
+							KEY_FIELDS.key(),
+							KEY_FORMAT.key()
+					)
+			);
+		} else if (optionalKeyFormat.isPresent() &&
+				(!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+			throw new ValidationException(
+					String.format(
+							"A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+							KEY_FORMAT.key(),
+							KEY_FIELDS.key()
+					)
+			);
+		}
+
+		if (!optionalKeyFormat.isPresent()) {
+			return new int[0];
+		}
+
+		final List<String> keyFields = optionalKeyFields.get();
+		final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType);
+		return keyFields.stream()
+				.mapToInt(keyField -> {
+					final int pos = physicalFields.indexOf(keyField);
+					if (pos < 0) {
+						throw new ValidationException(
+								String.format(
+										"Could not find the field '%s' in the table schema for usage in the key format. "
+												+ "A key field must be a regular, physical column. "
+												+ "The following columns can be selected in the '%s' option:\n"
+												+ "%s",
+										keyField,
+										KEY_FIELDS.key(),
+										physicalFields
+								)
+						);
+					}
+					return pos;
+				})
+			.toArray();
+	}
+
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the value format.
+	 *
+	 * <p>See {@link #VALUE_FORMAT} and {@link #VALUE_FIELDS_INCLUDE} for more information.
+	 */
+	public static int[] createValueFormatProjection(ReadableConfig options, DataType physicalDataType) {

Review comment:
       We can have an overload method `createValueFormatProjection(ReadableConfig options, DataType physicalDataType, Optional<UniqueConstraint> primary key)` to reuse code in this method. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -589,7 +712,7 @@ public ReadableConfig getOptions() {
 
 		private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
 			String identifier = formatFactory.factoryIdentifier();
-			if (formatOption.key().equals(FORMAT_KEY)) {
+			if (formatOption.key().equals(FORMAT_KEY) || formatOption.equals(KEY_FORMAT) || formatOption.equals(VALUE_FORMAT)) {

Review comment:
       We don't need to add this condition. This has been handled by the following condition. 
   
   If the format key is prefixed with `key.` or `value.`, then all the format options should be prefixed with the `key.` or `value.`.
   
   ```
   'value.format' = 'csv',
   'value.csv.ignore-parse-error' = 'true'
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as sink only supports insert-only encoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validatePKConstraints(TableSchema schema) {
+		if (!schema.getPrimaryKey().isPresent()) {
+			throw new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.");
+		}
+
+		for (String key: schema.getPrimaryKey().get().getColumns()) {
+			if (!schema.getTableColumn(key).get().isPhysical()) {
+				throw new ValidationException(

Review comment:
       This will be validated by the planner. We don't need to validate again in connector. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),

Review comment:
       Why not use `KafkaOptions#getKafkaProperties`?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||

Review comment:
       Extract the above changelog mode validation logic and reuse here. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -280,8 +280,8 @@ private static void validateSinkSemantic(ReadableConfig tableOptions) {
 	// Utilities
 	// --------------------------------------------------------------------------------------------
 
-	public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
-		switch (tableOptions.get(SINK_SEMANTIC)){
+	public static KafkaSinkSemantic getSinkSemantic(String semantic){

Review comment:
       We don't need to change this. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -105,24 +122,37 @@
 
 	public KafkaDynamicSource(
 			DataType physicalDataType,
+			ChangelogMode changelogMode,
+			@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+			int[] keyProjection,
+			int[] valueProjection,
+			@Nullable String keyPrefix,
 			@Nullable List<String> topics,
 			@Nullable Pattern topicPattern,
 			Properties properties,
-			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
+		// Format attributes
 		this.physicalDataType = Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+		this.keyDecodingFormat = keyDecodingFormat;
+		this.valueDecodingFormat = Preconditions.checkNotNull(
+				valueDecodingFormat, "Value decoding format must not be null.");
+		this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+		this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+		this.keyPrefix = keyPrefix;
+		// Mutable attributes
 		this.producedDataType = physicalDataType;
 		this.metadataKeys = Collections.emptyList();
+		this.changelogMode = changelogMode;

Review comment:
       From my point of view, privoding the `ChangelogMode` by invokers is not safe. 
   A more elegant way is providing a wrapper of `DecodingFormat` and `EncodingFormat` by the table factory, e.g. the upsert kafka factory should provide a wrapper which returns upsert changelog mode for `DecodingFormat#getChangelogMode()`. Then there is nothing to change in the `KafkaDynamicSource` and `KafkaDynamicSink`.
   
   I think you need the wrapper anyway, because you need to change the RowKind from `INSERT` to `UPDATE_AFTER`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -71,16 +88,45 @@ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Excepti
 		throw new IllegalStateException("A collector is required for deserializing.");
 	}
 
+	/**
+	 * There are 4 situations:
+	 * 1. value data only: collector will ignore the deserialization error;
+	 * 2. value data and metadata only:
+	 *   if value fails to deserialize, value decoding format will ignore data;
+	 *   else OutputProjectionCollector will emitRow with key is null;
+	 * 3. key data and value data (no tombstone):
+	 *   if key fails to deserialize and value succeed: OutputProjection ignores the value data;
+	 *   else if key succeed and value fails, value decoding format will ignore;
+	 *   else if both succeed, OutputProjection will emitRow with the key and data.
+	 * 4.key data and value data (tombstone):
+	 *   if key fails, OutputProjection ignores value data;
+	 *   else emitRow with key and null value.
+	 */
 	@Override
 	public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
+		// shortcut in case no output projection is required,
+		// also not for a cartesian product with the keys
+		if (keyDeserialization == null && !hasMetadata) {
 			valueDeserialization.deserialize(record.value(), collector);
+			return;
+		}
+
+		// buffer key(s)
+		if (keyDeserialization != null) {
+			keyDeserialization.deserialize(record.key(), keyCollector);
+		}
+
+		// project output while emitting values
+		outputCollector.inputRecord = record;
+		outputCollector.physicalKeyRows = keyCollector.buffer;
+		outputCollector.outputCollector = collector;
+		if (record.value() != null) {
+			valueDeserialization.deserialize(record.value(), outputCollector);
 		} else {
-			metadataAppendingCollector.inputRecord = record;
-			metadataAppendingCollector.outputCollector = collector;
-			valueDeserialization.deserialize(record.value(), metadataAppendingCollector);
+			// trigger collect by hand
+			outputCollector.collect(null);

Review comment:
       We should add a flag, e.g. `upsertMode=true` to enable this behavior. Because for normal kafka connector, the tombstone messages should be skipped.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {

Review comment:
       Add `serialVersionUID`.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCaseUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.utils.TableTestMatchers;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * utils.
+ * */
+public class KafkaTableITCaseUtils {

Review comment:
       Would be better to call `KafkaTableTestUtils`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)

Review comment:
       Hard code to use `KafkaSinkSemantic.AT_LEAST_ONCE`.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link UpsertKafkaDynamicTableFactory}.
+ */
+public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
+
+	private static final String SOURCE_TOPICS = "sourceTopic_1;sourceTopic_2";
+
+	private static final List<String> SOURCE_TOPIC_LIST =
+			Arrays.asList("sourceTopic_1", "sourceTopic_2");
+
+	private static final String SINK_TOPIC = "sinkTopic";
+
+	private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+			.field("window_start", new AtomicDataType(new VarCharType(false, 100)))
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))

Review comment:
       Use `DataTypes` to create `DataType`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+		private final List<RowData> buffer = new ArrayList<>();
+
+		@Override
+		public void collect(RowData record) {
+			buffer.add(record);
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Emits a row with key, value, and metadata fields.
+	 *
+	 * <p>The collector is able to handle the following kinds of keys:
+	 * <ul>
+	 *     <li>No key is used.
+	 *     <li>A key is used.
+	 *     <li>The deserialization schema emits multiple keys.
+	 * </ul>
+	 */
+	private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
 
 		private static final long serialVersionUID = 1L;
 
+		private final int[] keyProjection;
+
+		private final int[] valueProjection;
+
 		private final MetadataConverter[] metadataConverters;
 
 		private transient ConsumerRecord<?, ?> inputRecord;
 
+		private transient List<RowData> physicalKeyRows;
+
 		private transient Collector<RowData> outputCollector;
 
-		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+		OutputProjectionCollector(
+				int[] keyProjection,
+				int[] valueProjection,
+				MetadataConverter[] metadataConverters) {
+			this.keyProjection = keyProjection;
+			this.valueProjection = valueProjection;
 			this.metadataConverters = metadataConverters;
 		}
 
 		@Override
-		public void collect(RowData physicalRow) {
-			final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
-			final int physicalArity = physicalRow.getArity();
+		public void collect(RowData physicalValueRow) {
+			// no key defined
+			if (keyProjection.length == 0) {
+				emitRow(null, (GenericRowData) physicalValueRow);
+				return;
+			}
+
+			// emit a value for each key
+			// if parsing key data gets problems, ignore the value data.
+			for (RowData physicalKeyRow : physicalKeyRows) {
+				if (physicalValueRow == null) {
+					physicalKeyRow.setRowKind(RowKind.DELETE);
+				}
+				emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+			}
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+
+		/**
+		 * There are 4 situations:
+		 * 1. key is null && value is not null => project and set rowkind = insert
+		 * 2. key is not null && value is not null => project and set rowkind = insert
+		 * 3. key is not null && value is null => project and set rowkind = delete
+		 * 4. key is null && value is null => impossible
+		 * This situation is impossible:
+		 *   keyProjection.length > 0 && key == null
+		 * */
+		private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
 			final int metadataArity = metadataConverters.length;
+			final int physicalArity = keyProjection.length + valueProjection.length;
+			final RowKind rowkind = physicalValueRow == null ? RowKind.DELETE : physicalValueRow.getRowKind();

Review comment:
       Should also under `upsertMode==true`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);

Review comment:
       `keyRow` should always in `INSERT` RowKind. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+		private final List<RowData> buffer = new ArrayList<>();
+
+		@Override
+		public void collect(RowData record) {
+			buffer.add(record);
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Emits a row with key, value, and metadata fields.
+	 *
+	 * <p>The collector is able to handle the following kinds of keys:
+	 * <ul>
+	 *     <li>No key is used.
+	 *     <li>A key is used.
+	 *     <li>The deserialization schema emits multiple keys.
+	 * </ul>
+	 */
+	private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
 
 		private static final long serialVersionUID = 1L;
 
+		private final int[] keyProjection;
+
+		private final int[] valueProjection;
+
 		private final MetadataConverter[] metadataConverters;
 
 		private transient ConsumerRecord<?, ?> inputRecord;
 
+		private transient List<RowData> physicalKeyRows;
+
 		private transient Collector<RowData> outputCollector;
 
-		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+		OutputProjectionCollector(
+				int[] keyProjection,
+				int[] valueProjection,
+				MetadataConverter[] metadataConverters) {
+			this.keyProjection = keyProjection;
+			this.valueProjection = valueProjection;
 			this.metadataConverters = metadataConverters;
 		}
 
 		@Override
-		public void collect(RowData physicalRow) {
-			final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
-			final int physicalArity = physicalRow.getArity();
+		public void collect(RowData physicalValueRow) {
+			// no key defined
+			if (keyProjection.length == 0) {
+				emitRow(null, (GenericRowData) physicalValueRow);
+				return;
+			}
+
+			// emit a value for each key
+			// if parsing key data gets problems, ignore the value data.
+			for (RowData physicalKeyRow : physicalKeyRows) {
+				if (physicalValueRow == null) {
+					physicalKeyRow.setRowKind(RowKind.DELETE);

Review comment:
       We don't need to set `RowKind` on `keyRow`, because we never to use it.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);
+			keySerialized = keySerialization.serialize(keyRow);
 		}
 
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			return new ProducerRecord<>(topic, partition, null, null, valueSerialized);
+		final byte[] valueSerialized;
+		if (kind == RowKind.DELETE) {
+			valueSerialized = null;
+		} else {
+			final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
+			valueSerialized = valueSerialization.serialize(valueRow);

Review comment:
       Should under `upsertMode == true`. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);
+			keySerialized = keySerialization.serialize(keyRow);
 		}
 
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			return new ProducerRecord<>(topic, partition, null, null, valueSerialized);
+		final byte[] valueSerialized;
+		if (kind == RowKind.DELETE) {

Review comment:
       `kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE`

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link UpsertKafkaDynamicTableFactory}.
+ */
+public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
+
+	private static final String SOURCE_TOPICS = "sourceTopic_1;sourceTopic_2";
+
+	private static final List<String> SOURCE_TOPIC_LIST =
+			Arrays.asList("sourceTopic_1", "sourceTopic_2");
+
+	private static final String SINK_TOPIC = "sinkTopic";
+
+	private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+			.field("window_start", new AtomicDataType(new VarCharType(false, 100)))
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))
+			.field("view_count", DataTypes.BIGINT())
+			.primaryKey("window_start", "region")
+			.build();
+
+	private static final int[] SOURCE_KEY_FIELDS = new int[]{0, 1};
+
+	private static final int[] SOURCE_VALUE_FIELDS = new int[]{0, 1, 2};
+
+	private static final TableSchema SINK_SCHEMA = TableSchema.builder()
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))
+			.field("view_count", DataTypes.BIGINT())
+			.primaryKey("region")
+			.build();
+
+	private static final int[] SINK_KEY_FIELDS = new int[]{0};
+
+	private static final int[] SINK_VALUE_FIELDS = new int[]{0, 1};
+
+	private static final Properties UPSERT_KAFKA_SOURCE_PROPERTIES = new Properties();
+	private static final Properties UPSERT_KAFKA_SINK_PROPERTIES = new Properties();
+
+	static {
+		UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+
+		UPSERT_KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+	}
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testTableSource() {
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
+				new TestFormatFactory.DecodingFormatMock(
+						",", true, ChangelogMode.insertOnly());
+
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+				new TestFormatFactory.DecodingFormatMock(
+						",", true, ChangelogMode.insertOnly());
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sourceTable");
+		CatalogTable catalogTable =
+				new CatalogTableImpl(SOURCE_SCHEMA, getFullSourceOptions(), "sourceTable");
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		final KafkaDynamicSource expectedSource = new KafkaDynamicSource(
+				producedDataType,
+				UpsertKafkaDynamicTableFactory.SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				SOURCE_KEY_FIELDS,
+				SOURCE_VALUE_FIELDS,
+				null,
+				SOURCE_TOPIC_LIST,
+				null,
+				UPSERT_KAFKA_SOURCE_PROPERTIES,
+				StartupMode.EARLIEST,
+				Collections.emptyMap(),
+				0);
+		assertEquals(actualSource, expectedSource);
+
+		final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource;
+		ScanTableSource.ScanRuntimeProvider provider =
+				actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(provider, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
+		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
+		assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
+	}
+
+	@Test
+	public void testTableSink() {
+		final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType();
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+				new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+				new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+
+		// Construct table sink using options and table sink factory.
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sinkTable");
+		final CatalogTable sinkTable =
+				new CatalogTableImpl(SINK_SCHEMA, getFullSinkOptions(), "sinkTable");
+		final DynamicTableSink actualSink = FactoryUtil.createTableSink(
+				null,
+				objectIdentifier,
+				sinkTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		final DynamicTableSink expectedSink = new KafkaDynamicSink(
+				consumedDataType,
+				UpsertKafkaDynamicTableFactory.SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				SINK_KEY_FIELDS,
+				SINK_VALUE_FIELDS,
+				null,
+				SINK_TOPIC,
+				UPSERT_KAFKA_SINK_PROPERTIES,
+				Optional.empty(),
+				KafkaOptions.getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE));
+
+		// Test sink format.
+		final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) actualSink;
+		assertEquals(valueEncodingFormat, actualUpsertKafkaSink.valueEncodingFormat);
+		assertEquals(keyEncodingFormat, actualUpsertKafkaSink.keyEncodingFormat);
+		assertEquals(expectedSink, actualSink);
+
+		// Test kafka producer.
+		DynamicTableSink.SinkRuntimeProvider provider =
+				actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+		assertThat(provider, instanceOf(SinkFunctionProvider.class));
+		final SinkFunctionProvider sinkFunctionProvider = (SinkFunctionProvider) provider;
+		final SinkFunction<RowData> sinkFunction = sinkFunctionProvider.createSinkFunction();
+		assertThat(sinkFunction, instanceOf(FlinkKafkaProducer.class));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Negative tests
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testCreateSourceTableWithoutPK() {
+		TableSchema illegalSchema = TableSchema.builder()
+				.field("window_start", DataTypes.STRING())
+				.field("region", DataTypes.STRING())
+				.field("view_count", DataTypes.BIGINT())
+				.build();
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sourceTable");
+		CatalogTable catalogTable = new CatalogTableImpl(illegalSchema, getFullSourceOptions(), "sourceTable");
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.")));
+
+		FactoryUtil.createTableSource(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+	}
+
+	@Test
+	public void testCreateSinkTableWithoutPK() {
+		TableSchema illegalSchema = TableSchema.builder()
+				.field("region", DataTypes.STRING())
+				.field("view_count", DataTypes.BIGINT())
+				.build();
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sinkTable");
+		CatalogTable catalogTable = new CatalogTableImpl(illegalSchema, getFullSinkOptions(), "sinkTable");
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.")));
+
+		FactoryUtil.createTableSink(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);

Review comment:
       I think we can reuse the verbose create table sink/source code, e.g. 
   
   ```java
   	private static DynamicTableSink createTableSink(TableSchema schema, Map<String, String> options) {
   		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
   			"default",
   			"default",
   			"testTable");
   		CatalogTable catalogTable = new CatalogTableImpl(
   			schema,
   			options,
   			"testTable");
   		return FactoryUtil.createTableSink(null,
   			objectIdentifier,
   			catalogTable,
   			new Configuration(),
   			Thread.currentThread().getContextClassLoader(),
   			false);
   	}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org