You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chaiyongqiang (Jira)" <ji...@apache.org> on 2019/11/12 08:54:00 UTC
[jira] [Commented] (FLINK-14719) Making Semantic configurable in
Flinkkafkaproducer to support exactly-once semantic in Table API
[ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191 ]
chaiyongqiang commented on FLINK-14719:
---------------------------------------
when i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink. +I could open a new issue to tracking this.+
But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way.
{code:java}
/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
defaultTopicId,
serializationSchema,
producerConfig,
customPartitioner,
Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
{code}
Could someone offer me some advice ? Many thans.
> Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-14719
> URL: https://issues.apache.org/jira/browse/FLINK-14719
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.8.0
> Reporter: chaiyongqiang
> Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic . But when we use Table API, things are different.
> The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server. It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
> String topic,
> Properties properties,
> SerializationSchema<Row> serializationSchema,
> Optional<FlinkKafkaPartitioner<Row>> partitioner) {
> return new FlinkKafkaProducer<>(
> topic,
> new KeyedSerializationSchemaWrapper<>(serializationSchema),
> properties,
> partitioner);
> }
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer :
> {code:java}
> public FlinkKafkaProducer(
> String defaultTopicId,
> KeyedSerializationSchema<IN> serializationSchema,
> Properties producerConfig,
> Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
> this(
> defaultTopicId,
> serializationSchema,
> producerConfig,
> customPartitioner,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }
> {code}
> This makes user could not achieve exactly-once semantic when using Table API.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)