You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/10 03:53:17 UTC

[flink] branch master updated: [FLINK-15221][kafka][table] Support sink delivery semantic for Kafka in Table API

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fce502c  [FLINK-15221][kafka][table] Support sink delivery semantic for Kafka in Table API
fce502c is described below

commit fce502cbc481e97b951259ac9350d0e401c68774
Author: Shengkai <10...@qq.com>
AuthorDate: Thu Jul 2 16:28:14 2020 +0800

    [FLINK-15221][kafka][table] Support sink delivery semantic for Kafka in Table API
    
    This closes #12805
---
 docs/dev/table/connectors/kafka.md                 | 20 +++++++++
 docs/dev/table/connectors/kafka.zh.md              | 23 +++++++++-
 .../kafka/table/Kafka010DynamicSink.java           |  6 ++-
 .../kafka/table/Kafka010DynamicTableFactory.java   | 18 +++++++-
 .../table/Kafka010DynamicTableFactoryTest.java     | 50 +++++++++++++++++++++-
 .../kafka/table/Kafka011DynamicSink.java           | 19 +++++---
 .../kafka/table/Kafka011DynamicTableFactory.java   |  6 ++-
 .../table/Kafka011DynamicTableFactoryTest.java     |  6 ++-
 .../kafka/table/KafkaDynamicSinkBase.java          | 22 +++++++---
 .../kafka/table/KafkaDynamicTableFactoryBase.java  | 17 +++++---
 .../connectors/kafka/table/KafkaOptions.java       | 44 +++++++++++++++++++
 .../connectors/kafka/table/KafkaSinkSemantic.java  | 43 +++++++++++++++++++
 .../table/KafkaDynamicTableFactoryTestBase.java    | 40 ++++++++++++++---
 .../connectors/kafka/table/KafkaDynamicSink.java   | 16 ++++---
 .../kafka/table/KafkaDynamicTableFactory.java      |  6 ++-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  |  6 ++-
 16 files changed, 300 insertions(+), 42 deletions(-)

diff --git a/docs/dev/table/connectors/kafka.md b/docs/dev/table/connectors/kafka.md
index f6ec308..cffd633 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -165,6 +165,13 @@ Connector Options
       </ul>
       </td>
     </tr>
+    <tr>
+      <td><h5>sink.semantic</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">at-least-once</td>
+      <td>String</td>
+      <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
+    </tr>
     </tbody>
 </table>
 
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections between all the Flink instan
 
 By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
 
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `none`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
+ * `at-least-once` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+ * `exactly-once`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any application consuming records
+ from Kafka.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
+
 Data Type Mapping
 ----------------
 
diff --git a/docs/dev/table/connectors/kafka.zh.md b/docs/dev/table/connectors/kafka.zh.md
index ffc373e..1f09f64 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -165,6 +165,14 @@ Connector Options
       </ul>
       </td>
     </tr>
+    <tr>
+      <td><h5>sink.semantic</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">at-least-once</td>
+      <td>String</td>
+      <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. 
+      See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
+    </tr>
     </tbody>
 </table>
 
@@ -205,7 +213,20 @@ However, it will cause a lot of network connections between all the Flink instan
 
 ### Consistency guarantees
 
-By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.zh.md %}#enabling-and-configuring-checkpointing).
+By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+ * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any application consuming records
+ from Kafka.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
 
 Data Type Mapping
 ----------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
index 9972794..89df458 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
@@ -48,7 +48,8 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
 			topic,
 			properties,
 			partitioner,
-			encodingFormat);
+			encodingFormat,
+			KafkaSinkSemantic.AT_LEAST_ONCE);
 	}
 
 	@Override
@@ -56,7 +57,8 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			SerializationSchema<RowData> serializationSchema,
-			Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+			KafkaSinkSemantic semantic) {
 		return new FlinkKafkaProducer010<>(
 			topic,
 			serializationSchema,
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
index b5737c4..f7b62f2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -31,6 +32,10 @@ import org.apache.flink.table.types.DataType;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Factory for creating configured instances of {@link Kafka010DynamicSource}.
@@ -65,8 +70,11 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 
+		// Kafka 0.10 doesn't support exactly once guarantee, thus it should be the default at-least-once
+		checkArgument(semantic.equals(KafkaSinkSemantic.AT_LEAST_ONCE));
 		return new Kafka010DynamicSink(
 			consumedDataType,
 			topic,
@@ -79,4 +87,12 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 	public String factoryIdentifier() {
 		return IDENTIFIER;
 	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = super.optionalOptions();
+		// Connector kafka-0.10 only supports "at-least-once"
+		options.remove(SINK_SEMANTIC);
+		return options;
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
index 12c892b..0df462a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
@@ -28,15 +29,22 @@ import org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactor
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
  * by {@link Kafka010TableSourceSinkFactory}.
@@ -82,7 +90,10 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
+		// we only support "at-least-once" for kafka 0.10 sink.
+		// if users use `sink.semantic` for kafka 0.10, an exception should be thrown
 		return new Kafka010DynamicSink(
 				consumedDataType,
 				topic,
@@ -90,4 +101,41 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 				partitioner,
 				encodingFormat);
 	}
+
+	@Override
+	protected Map<String, String> getFullSinkOptions(){
+		Map<String, String> options = super.getFullSinkOptions();
+		// remove 'sink.semantic' as kafka 0.10 doesn't support it
+		options.remove("sink.semantic");
+		return options;
+	}
+
+	@Override
+	public void testInvalidSinkSemantic() {
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"sinkTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSinkOptions(),
+			options -> {
+				options.put("sink.semantic", "exactly-once");
+			});
+		final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		try {
+			FactoryUtil.createTableSink(
+				null,
+				objectIdentifier,
+				sinkTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader());
+			fail("Connector 'kafka-0.10' doesn't support 'sink.semantic'");
+		} catch (Exception e) {
+			assertTrue(ExceptionUtils
+				.findThrowableWithMessage(e, "Unsupported options:\n\nsink.semantic")
+				.isPresent());
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index d632928..7d29e2a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -42,13 +43,15 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		super(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 
 	@Override
@@ -56,12 +59,15 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			SerializationSchema<RowData> serializationSchema,
-			Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+			KafkaSinkSemantic semantic) {
 		return new FlinkKafkaProducer011<>(
 				topic,
-				serializationSchema,
+				new KeyedSerializationSchemaWrapper<>(serializationSchema),
 				properties,
-				partitioner);
+				partitioner,
+				FlinkKafkaProducer011.Semantic.valueOf(semantic.toString()),
+				FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
 
 	@Override
@@ -71,7 +77,8 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 				this.topic,
 				this.properties,
 				this.partitioner,
-				this.encodingFormat);
+				this.encodingFormat,
+				this.semantic);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
index d928fe6..47ac725 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
@@ -63,13 +63,15 @@ public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		return new Kafka011DynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
index 37ecf15..695dcd4 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
@@ -82,12 +82,14 @@ public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		return new Kafka011DynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
index 44058d2..2d6e893 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
@@ -38,11 +38,10 @@ import java.util.Properties;
  * A version-agnostic Kafka {@link DynamicTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional, KafkaSinkSemantic)}}.
  */
 @Internal
 public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
-
 	/** Consumed data type of the table. */
 	protected final DataType consumedDataType;
 
@@ -58,17 +57,22 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 	/** Partitioner to select Kafka partition for each item. */
 	protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
 
+	/** Sink commit semantic.*/
+	protected final KafkaSinkSemantic semantic;
+
 	protected KafkaDynamicSinkBase(
 			DataType consumedDataType,
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null.");
 		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
 		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
 		this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
 		this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
+		this.semantic = Preconditions.checkNotNull(semantic, "Semantic must not be null.");
 	}
 
 	@Override
@@ -85,7 +89,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 				this.topic,
 				properties,
 				serializationSchema,
-				this.partitioner);
+				this.partitioner,
+				this.semantic);
 
 		return SinkFunctionProvider.of(kafkaProducer);
 	}
@@ -103,7 +108,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 		String topic,
 		Properties properties,
 		SerializationSchema<RowData> serializationSchema,
-		Optional<FlinkKafkaPartitioner<RowData>> partitioner);
+		Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+		KafkaSinkSemantic semantic);
 
 	@Override
 	public boolean equals(Object o) {
@@ -118,7 +124,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 			Objects.equals(topic, that.topic) &&
 			Objects.equals(properties, that.properties) &&
 			Objects.equals(encodingFormat, that.encodingFormat) &&
-			Objects.equals(partitioner, that.partitioner);
+			Objects.equals(partitioner, that.partitioner) &&
+			Objects.equals(semantic, that.semantic);
 	}
 
 	@Override
@@ -128,6 +135,7 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 			topic,
 			properties,
 			encodingFormat,
-			partitioner);
+			partitioner,
+			semantic);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
index 1730809..2735e48 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
@@ -43,15 +43,19 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableOptions;
 
@@ -74,12 +78,12 @@ public abstract class KafkaDynamicTableFactoryBase implements
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
-		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
 		DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		final KafkaOptions.StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+		final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
 		return createKafkaTableSource(
 				producedDataType,
 				topic,
@@ -101,7 +105,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 				SerializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
-		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		helper.validateExcept(PROPERTIES_PREFIX);
 		// Validate the option values.
 		validateTableOptions(tableOptions);
 
@@ -111,7 +115,8 @@ public abstract class KafkaDynamicTableFactoryBase implements
 				topic,
 				getKafkaProperties(context.getCatalogTable().getOptions()),
 				getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()),
-				encodingFormat);
+				encodingFormat,
+				getSinkSemantic(tableOptions));
 	}
 
 	/**
@@ -148,7 +153,8 @@ public abstract class KafkaDynamicTableFactoryBase implements
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat);
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic);
 
 	@Override
 	public Set<ConfigOption<?>> requiredOptions() {
@@ -167,6 +173,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 		options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
 		options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
 		options.add(SINK_PARTITIONER);
+		options.add(SINK_SEMANTIC);
 		return options;
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 337fe76..4661e17 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -39,6 +39,10 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.EXACTLY_ONCE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.NONE;
+
 /** Option utils for Kafka table source sink. */
 public class KafkaOptions {
 	private KafkaOptions() {}
@@ -103,6 +107,11 @@ public class KafkaOptions {
 					+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
 					+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
 
+	public static final ConfigOption<String> SINK_SEMANTIC = ConfigOptions.key("sink.semantic")
+			.stringType()
+			.defaultValue("at-least-once")
+			.withDescription("Optional semantic when commit. Valid enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
+
 	// --------------------------------------------------------------------------------------------
 	// Option enumerations
 	// --------------------------------------------------------------------------------------------
@@ -129,6 +138,17 @@ public class KafkaOptions {
 			SINK_PARTITIONER_VALUE_FIXED,
 			SINK_PARTITIONER_VALUE_ROUND_ROBIN));
 
+	// Sink semantic
+	public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once";
+	public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once";
+	public static final String SINK_SEMANTIC_VALUE_NONE = "none";
+
+	private static final Set<String> SINK_SEMANTIC_ENUMS = new HashSet<>(Arrays.asList(
+		SINK_SEMANTIC_VALUE_AT_LEAST_ONCE,
+		SINK_SEMANTIC_VALUE_EXACTLY_ONCE,
+		SINK_SEMANTIC_VALUE_NONE
+	));
+
 	// Prefix for Kafka specific properties.
 	public static final String PROPERTIES_PREFIX = "properties.";
 
@@ -143,6 +163,7 @@ public class KafkaOptions {
 	public static void validateTableOptions(ReadableConfig tableOptions) {
 		validateScanStartupMode(tableOptions);
 		validateSinkPartitioner(tableOptions);
+		validateSinkSemantic(tableOptions);
 	}
 
 	private static void validateScanStartupMode(ReadableConfig tableOptions) {
@@ -191,10 +212,33 @@ public class KafkaOptions {
 				});
 	}
 
+	private static void validateSinkSemantic(ReadableConfig tableOptions) {
+		tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> {
+			if (!SINK_SEMANTIC_ENUMS.contains(semantic)){
+				throw new ValidationException(
+					String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].",
+						semantic, SINK_SEMANTIC.key()));
+			}
+		});
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Utilities
 	// --------------------------------------------------------------------------------------------
 
+	public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
+		switch (tableOptions.get(SINK_SEMANTIC)){
+			case SINK_SEMANTIC_VALUE_EXACTLY_ONCE:
+				return EXACTLY_ONCE;
+			case SINK_SEMANTIC_VALUE_AT_LEAST_ONCE:
+				return AT_LEAST_ONCE;
+			case SINK_SEMANTIC_VALUE_NONE:
+				return NONE;
+			default:
+				throw new TableException("Validator should have checked that");
+		}
+	}
+
 	public static StartupOptions getStartupOptions(
 			ReadableConfig tableOptions,
 			String topic) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java
new file mode 100644
index 0000000..908f215
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Kafka sink semantic Enum.
+ * */
+@Internal
+public enum KafkaSinkSemantic {
+	/**
+	 * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be
+	 * committed to Kafka on a checkpoint.
+	 */
+	EXACTLY_ONCE,
+	/**
+	 * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers
+	 * to be acknowledged by the Kafka producer on a checkpoint.
+	 */
+	AT_LEAST_ONCE,
+	/**
+	 * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case
+	 * of failure.
+	 */
+	NONE
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 09372b1..21b81ea 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -87,6 +87,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 	private static final String COMPUTED_COLUMN_NAME = "computed-column";
 	private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
 	private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
+	private static final String SEMANTIC = "exactly-once";
 
 	private static final Properties KAFKA_PROPERTIES = new Properties();
 	static {
@@ -210,7 +211,9 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 				TOPIC,
 				KAFKA_PROPERTIES,
 				Optional.of(new FlinkFixedPartitioner<>()),
-				encodingFormat);
+				encodingFormat,
+				KafkaSinkSemantic.EXACTLY_ONCE
+			);
 		assertEquals(expectedSink, actualSink);
 
 		// Test sink format.
@@ -326,6 +329,29 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 				Thread.currentThread().getContextClassLoader());
 	}
 
+	@Test
+	public void testInvalidSinkSemantic(){
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+			"default",
+			"default",
+			"sinkTable");
+
+		final Map<String, String> modifiedOptions = getModifiedOptions(
+			getFullSourceOptions(),
+			options -> {
+				options.put("sink.semantic", "xyz");
+			});
+		final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported values are ['at-least-once', 'exactly-once', 'none'].")));
+		FactoryUtil.createTableSink(
+			null,
+			objectIdentifier,
+			sinkTable,
+			new Configuration(),
+			Thread.currentThread().getContextClassLoader());
+	}
 	// --------------------------------------------------------------------------------------------
 	// Utilities
 	// --------------------------------------------------------------------------------------------
@@ -342,7 +368,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 		return new CatalogTableImpl(SOURCE_SCHEMA, options, "scanTable");
 	}
 
-	private CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) {
+	protected CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) {
 		return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable");
 	}
 
@@ -351,14 +377,14 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 	 *
 	 * @param optionModifier Consumer to modify the options
 	 */
-	private static Map<String, String> getModifiedOptions(
+	protected static Map<String, String> getModifiedOptions(
 			Map<String, String> options,
 			Consumer<Map<String, String>> optionModifier) {
 		optionModifier.accept(options);
 		return options;
 	}
 
-	private Map<String, String> getFullSourceOptions() {
+	protected Map<String, String> getFullSourceOptions() {
 		Map<String, String> tableOptions = new HashMap<>();
 		// Kafka specific options.
 		tableOptions.put("connector", factoryIdentifier());
@@ -378,7 +404,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 		return tableOptions;
 	}
 
-	private Map<String, String> getFullSinkOptions() {
+	protected Map<String, String> getFullSinkOptions() {
 		Map<String, String> tableOptions = new HashMap<>();
 		// Kafka specific options.
 		tableOptions.put("connector", factoryIdentifier());
@@ -386,6 +412,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 		tableOptions.put("properties.group.id", "dummy");
 		tableOptions.put("properties.bootstrap.servers", "dummy");
 		tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
+		tableOptions.put("sink.semantic", KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE);
 		// Format options.
 		tableOptions.put("format", TestFormatFactory.IDENTIFIER);
 		final String formatDelimiterKey = String.format("%s.%s",
@@ -419,6 +446,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic
 	);
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index aea837f..ec50a00 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -41,13 +41,15 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		super(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 
 	@Override
@@ -55,12 +57,15 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			SerializationSchema<RowData> serializationSchema,
-			Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+			KafkaSinkSemantic semantic) {
 		return new FlinkKafkaProducer<>(
 				topic,
 				serializationSchema,
 				properties,
-				partitioner);
+				partitioner.orElse(null),
+				FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
+				FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
 
 	@Override
@@ -70,7 +75,8 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 				this.topic,
 				this.properties,
 				this.partitioner,
-				this.encodingFormat);
+				this.encodingFormat,
+				this.semantic);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 7242238..2a9d640 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -62,13 +62,15 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		return new KafkaDynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 785578a..adf2ac2 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -81,12 +81,14 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+			KafkaSinkSemantic semantic) {
 		return new KafkaDynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				encodingFormat);
+				encodingFormat,
+				semantic);
 	}
 }