You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/10 03:53:17 UTC
[flink] branch master updated: [FLINK-15221][kafka][table] Support
sink delivery semantic for Kafka in Table API
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fce502c [FLINK-15221][kafka][table] Support sink delivery semantic for Kafka in Table API
fce502c is described below
commit fce502cbc481e97b951259ac9350d0e401c68774
Author: Shengkai <10...@qq.com>
AuthorDate: Thu Jul 2 16:28:14 2020 +0800
[FLINK-15221][kafka][table] Support sink delivery semantic for Kafka in Table API
This closes #12805
---
docs/dev/table/connectors/kafka.md | 20 +++++++++
docs/dev/table/connectors/kafka.zh.md | 23 +++++++++-
.../kafka/table/Kafka010DynamicSink.java | 6 ++-
.../kafka/table/Kafka010DynamicTableFactory.java | 18 +++++++-
.../table/Kafka010DynamicTableFactoryTest.java | 50 +++++++++++++++++++++-
.../kafka/table/Kafka011DynamicSink.java | 19 +++++---
.../kafka/table/Kafka011DynamicTableFactory.java | 6 ++-
.../table/Kafka011DynamicTableFactoryTest.java | 6 ++-
.../kafka/table/KafkaDynamicSinkBase.java | 22 +++++++---
.../kafka/table/KafkaDynamicTableFactoryBase.java | 17 +++++---
.../connectors/kafka/table/KafkaOptions.java | 44 +++++++++++++++++++
.../connectors/kafka/table/KafkaSinkSemantic.java | 43 +++++++++++++++++++
.../table/KafkaDynamicTableFactoryTestBase.java | 40 ++++++++++++++---
.../connectors/kafka/table/KafkaDynamicSink.java | 16 ++++---
.../kafka/table/KafkaDynamicTableFactory.java | 6 ++-
.../kafka/table/KafkaDynamicTableFactoryTest.java | 6 ++-
16 files changed, 300 insertions(+), 42 deletions(-)
diff --git a/docs/dev/table/connectors/kafka.md b/docs/dev/table/connectors/kafka.md
index f6ec308..cffd633 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -165,6 +165,13 @@ Connector Options
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>sink.semantic</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">at-least-once</td>
+ <td>String</td>
+ <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
+ </tr>
</tbody>
</table>
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections between all the Flink instan
By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `none`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
+ * `at-least-once` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+ * `exactly-once`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any application consuming records
+ from Kafka.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
+
Data Type Mapping
----------------
diff --git a/docs/dev/table/connectors/kafka.zh.md b/docs/dev/table/connectors/kafka.zh.md
index ffc373e..1f09f64 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -165,6 +165,14 @@ Connector Options
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>sink.semantic</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">at-least-once</td>
+ <td>String</td>
+ <td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option.
+ See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
+ </tr>
</tbody>
</table>
@@ -205,7 +213,20 @@ However, it will cause a lot of network connections between all the Flink instan
### Consistency guarantees
-By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.zh.md %}#enabling-and-configuring-checkpointing).
+By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({% link dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be lost (although they can be duplicated).
+ * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired `isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any application consuming records
+ from Kafka.
+
+Please refer to [Kafka documentation]({% link dev/connectors/kafka.md %}#kafka-producers-and-fault-tolerance) for more caveats about delivery guarantees.
Data Type Mapping
----------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
index 9972794..89df458 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
@@ -48,7 +48,8 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ KafkaSinkSemantic.AT_LEAST_ONCE);
}
@Override
@@ -56,7 +57,8 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
- Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+ KafkaSinkSemantic semantic) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
index b5737c4..f7b62f2 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -31,6 +32,10 @@ import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Factory for creating configured instances of {@link Kafka010DynamicSource}.
@@ -65,8 +70,11 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
+ // Kafka 0.10 doesn't support exactly once guarantee, thus it should be the default at-least-once
+ checkArgument(semantic.equals(KafkaSinkSemantic.AT_LEAST_ONCE));
return new Kafka010DynamicSink(
consumedDataType,
topic,
@@ -79,4 +87,12 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
public String factoryIdentifier() {
return IDENTIFIER;
}
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = super.optionalOptions();
+ // Connector kafka-0.10 only supports "at-least-once"
+ options.remove(SINK_SEMANTIC);
+ return options;
+ }
}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
index 12c892b..0df462a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
@@ -28,15 +29,22 @@ import org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactor
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
* by {@link Kafka010TableSourceSinkFactory}.
@@ -82,7 +90,10 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
+ // we only support "at-least-once" for kafka 0.10 sink.
+ // if users use `sink.semantic` for kafka 0.10, an exception should be thrown
return new Kafka010DynamicSink(
consumedDataType,
topic,
@@ -90,4 +101,41 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
partitioner,
encodingFormat);
}
+
+ @Override
+ protected Map<String, String> getFullSinkOptions(){
+ Map<String, String> options = super.getFullSinkOptions();
+ // remove 'sink.semantic' as kafka 0.10 doesn't support it
+ options.remove("sink.semantic");
+ return options;
+ }
+
+ @Override
+ public void testInvalidSinkSemantic() {
+ ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+ "default",
+ "default",
+ "sinkTable");
+
+ final Map<String, String> modifiedOptions = getModifiedOptions(
+ getFullSinkOptions(),
+ options -> {
+ options.put("sink.semantic", "exactly-once");
+ });
+ final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+ try {
+ FactoryUtil.createTableSink(
+ null,
+ objectIdentifier,
+ sinkTable,
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader());
+ fail("Connector 'kafka-0.10' doesn't support 'sink.semantic'");
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils
+ .findThrowableWithMessage(e, "Unsupported options:\n\nsink.semantic")
+ .isPresent());
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index d632928..7d29e2a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -42,13 +43,15 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
super(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
@@ -56,12 +59,15 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
- Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+ KafkaSinkSemantic semantic) {
return new FlinkKafkaProducer011<>(
topic,
- serializationSchema,
+ new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
- partitioner);
+ partitioner,
+ FlinkKafkaProducer011.Semantic.valueOf(semantic.toString()),
+ FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
@Override
@@ -71,7 +77,8 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
this.topic,
this.properties,
this.partitioner,
- this.encodingFormat);
+ this.encodingFormat,
+ this.semantic);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
index d928fe6..47ac725 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
@@ -63,13 +63,15 @@ public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
return new Kafka011DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
index 37ecf15..695dcd4 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
@@ -82,12 +82,14 @@ public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
return new Kafka011DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
index 44058d2..2d6e893 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
@@ -38,11 +38,10 @@ import java.util.Properties;
* A version-agnostic Kafka {@link DynamicTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional, KafkaSinkSemantic)}}.
*/
@Internal
public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
-
/** Consumed data type of the table. */
protected final DataType consumedDataType;
@@ -58,17 +57,22 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
/** Partitioner to select Kafka partition for each item. */
protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
+ /** Sink commit semantic.*/
+ protected final KafkaSinkSemantic semantic;
+
protected KafkaDynamicSinkBase(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null.");
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
+ this.semantic = Preconditions.checkNotNull(semantic, "Semantic must not be null.");
}
@Override
@@ -85,7 +89,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
this.topic,
properties,
serializationSchema,
- this.partitioner);
+ this.partitioner,
+ this.semantic);
return SinkFunctionProvider.of(kafkaProducer);
}
@@ -103,7 +108,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
- Optional<FlinkKafkaPartitioner<RowData>> partitioner);
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+ KafkaSinkSemantic semantic);
@Override
public boolean equals(Object o) {
@@ -118,7 +124,8 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Objects.equals(encodingFormat, that.encodingFormat) &&
- Objects.equals(partitioner, that.partitioner);
+ Objects.equals(partitioner, that.partitioner) &&
+ Objects.equals(semantic, that.semantic);
}
@Override
@@ -128,6 +135,7 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
topic,
properties,
encodingFormat,
- partitioner);
+ partitioner,
+ semantic);
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
index 1730809..2735e48 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
@@ -43,15 +43,19 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableOptions;
@@ -74,12 +78,12 @@ public abstract class KafkaDynamicTableFactoryBase implements
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// Validate the option data type.
- helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX);
// Validate the option values.
validateTableOptions(tableOptions);
DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
- final KafkaOptions.StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
+ final StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
return createKafkaTableSource(
producedDataType,
topic,
@@ -101,7 +105,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// Validate the option data type.
- helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX);
// Validate the option values.
validateTableOptions(tableOptions);
@@ -111,7 +115,8 @@ public abstract class KafkaDynamicTableFactoryBase implements
topic,
getKafkaProperties(context.getCatalogTable().getOptions()),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()),
- encodingFormat);
+ encodingFormat,
+ getSinkSemantic(tableOptions));
}
/**
@@ -148,7 +153,8 @@ public abstract class KafkaDynamicTableFactoryBase implements
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat);
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic);
@Override
public Set<ConfigOption<?>> requiredOptions() {
@@ -167,6 +173,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SINK_PARTITIONER);
+ options.add(SINK_SEMANTIC);
return options;
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 337fe76..4661e17 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -39,6 +39,10 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.EXACTLY_ONCE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic.NONE;
+
/** Option utils for Kafka table source sink. */
public class KafkaOptions {
private KafkaOptions() {}
@@ -103,6 +107,11 @@ public class KafkaOptions {
+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
+ public static final ConfigOption<String> SINK_SEMANTIC = ConfigOptions.key("sink.semantic")
+ .stringType()
+ .defaultValue("at-least-once")
+ .withDescription("Optional semantic when commit. Valid enumerationns are [\"at-least-once\", \"exactly-once\", \"none\"]");
+
// --------------------------------------------------------------------------------------------
// Option enumerations
// --------------------------------------------------------------------------------------------
@@ -129,6 +138,17 @@ public class KafkaOptions {
SINK_PARTITIONER_VALUE_FIXED,
SINK_PARTITIONER_VALUE_ROUND_ROBIN));
+ // Sink semantic
+ public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once";
+ public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once";
+ public static final String SINK_SEMANTIC_VALUE_NONE = "none";
+
+ private static final Set<String> SINK_SEMANTIC_ENUMS = new HashSet<>(Arrays.asList(
+ SINK_SEMANTIC_VALUE_AT_LEAST_ONCE,
+ SINK_SEMANTIC_VALUE_EXACTLY_ONCE,
+ SINK_SEMANTIC_VALUE_NONE
+ ));
+
// Prefix for Kafka specific properties.
public static final String PROPERTIES_PREFIX = "properties.";
@@ -143,6 +163,7 @@ public class KafkaOptions {
public static void validateTableOptions(ReadableConfig tableOptions) {
validateScanStartupMode(tableOptions);
validateSinkPartitioner(tableOptions);
+ validateSinkSemantic(tableOptions);
}
private static void validateScanStartupMode(ReadableConfig tableOptions) {
@@ -191,10 +212,33 @@ public class KafkaOptions {
});
}
+ private static void validateSinkSemantic(ReadableConfig tableOptions) {
+ tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> {
+ if (!SINK_SEMANTIC_ENUMS.contains(semantic)){
+ throw new ValidationException(
+ String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].",
+ semantic, SINK_SEMANTIC.key()));
+ }
+ });
+ }
+
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
+ public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
+ switch (tableOptions.get(SINK_SEMANTIC)){
+ case SINK_SEMANTIC_VALUE_EXACTLY_ONCE:
+ return EXACTLY_ONCE;
+ case SINK_SEMANTIC_VALUE_AT_LEAST_ONCE:
+ return AT_LEAST_ONCE;
+ case SINK_SEMANTIC_VALUE_NONE:
+ return NONE;
+ default:
+ throw new TableException("Validator should have checked that");
+ }
+ }
+
public static StartupOptions getStartupOptions(
ReadableConfig tableOptions,
String topic) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java
new file mode 100644
index 0000000..908f215
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaSinkSemantic.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Kafka sink semantic Enum.
+ * */
+@Internal
+public enum KafkaSinkSemantic {
+ /**
+ * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be
+ * committed to Kafka on a checkpoint.
+ */
+ EXACTLY_ONCE,
+ /**
+ * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers
+ * to be acknowledged by the Kafka producer on a checkpoint.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case
+ * of failure.
+ */
+ NONE
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 09372b1..21b81ea 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -87,6 +87,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
private static final String COMPUTED_COLUMN_NAME = "computed-column";
private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
+ private static final String SEMANTIC = "exactly-once";
private static final Properties KAFKA_PROPERTIES = new Properties();
static {
@@ -210,7 +211,9 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
- encodingFormat);
+ encodingFormat,
+ KafkaSinkSemantic.EXACTLY_ONCE
+ );
assertEquals(expectedSink, actualSink);
// Test sink format.
@@ -326,6 +329,29 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
Thread.currentThread().getContextClassLoader());
}
+ @Test
+ public void testInvalidSinkSemantic(){
+ ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+ "default",
+ "default",
+ "sinkTable");
+
+ final Map<String, String> modifiedOptions = getModifiedOptions(
+ getFullSourceOptions(),
+ options -> {
+ options.put("sink.semantic", "xyz");
+ });
+ final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+ thrown.expect(ValidationException.class);
+ thrown.expect(containsCause(new ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported values are ['at-least-once', 'exactly-once', 'none'].")));
+ FactoryUtil.createTableSink(
+ null,
+ objectIdentifier,
+ sinkTable,
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader());
+ }
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@@ -342,7 +368,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
return new CatalogTableImpl(SOURCE_SCHEMA, options, "scanTable");
}
- private CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) {
+ protected CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) {
return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable");
}
@@ -351,14 +377,14 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
*
* @param optionModifier Consumer to modify the options
*/
- private static Map<String, String> getModifiedOptions(
+ protected static Map<String, String> getModifiedOptions(
Map<String, String> options,
Consumer<Map<String, String>> optionModifier) {
optionModifier.accept(options);
return options;
}
- private Map<String, String> getFullSourceOptions() {
+ protected Map<String, String> getFullSourceOptions() {
Map<String, String> tableOptions = new HashMap<>();
// Kafka specific options.
tableOptions.put("connector", factoryIdentifier());
@@ -378,7 +404,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
return tableOptions;
}
- private Map<String, String> getFullSinkOptions() {
+ protected Map<String, String> getFullSinkOptions() {
Map<String, String> tableOptions = new HashMap<>();
// Kafka specific options.
tableOptions.put("connector", factoryIdentifier());
@@ -386,6 +412,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
tableOptions.put("properties.group.id", "dummy");
tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
+ tableOptions.put("sink.semantic", KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE);
// Format options.
tableOptions.put("format", TestFormatFactory.IDENTIFIER);
final String formatDelimiterKey = String.format("%s.%s",
@@ -419,6 +446,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic
);
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index aea837f..ec50a00 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -41,13 +41,15 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
super(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
@@ -55,12 +57,15 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
- Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+ KafkaSinkSemantic semantic) {
return new FlinkKafkaProducer<>(
topic,
serializationSchema,
properties,
- partitioner);
+ partitioner.orElse(null),
+ FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
@Override
@@ -70,7 +75,8 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
this.topic,
this.properties,
this.partitioner,
- this.encodingFormat);
+ this.encodingFormat,
+ this.semantic);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 7242238..2a9d640 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -62,13 +62,15 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
return new KafkaDynamicSink(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 785578a..adf2ac2 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -81,12 +81,14 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>> encodingFormat,
+ KafkaSinkSemantic semantic) {
return new KafkaDynamicSink(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
}