You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chaiyongqiang (Jira)" <ji...@apache.org> on 2019/11/12 08:54:00 UTC

[jira] [Commented] (FLINK-14719) Making Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API

    [ https://issues.apache.org/jira/browse/FLINK-14719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972191#comment-16972191 ] 

chaiyongqiang commented on FLINK-14719:
---------------------------------------

when i check the code in branch Flink 1.9 and the master, the constructor in FlinkKafkaProducer 1.8 becomes @deprecated. In the newer version of Flink, we could modify the createKafkaProducer method in KafkaTableSinkBase and all the classes which extend KafkaTableSinkBase to support exactly-once Semantic API in Flink.  +I could open a new issue to tracking this.+

But for branch flink 1.8 , a light weight method would help. We could achieve the semantic config and set it in the constructor in the following way.
{code:java}

/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* Configuration key for setting the producer semantic.
*/
public static final String KEY_SEMANTIC = "flink.semantic";

public FlinkKafkaProducer(
		String defaultTopicId,
		KeyedSerializationSchema<IN> serializationSchema,
		Properties producerConfig,
		Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
		this(
			defaultTopicId,
			serializationSchema,
			producerConfig,
			customPartitioner,
			Semantic.valueOf(producerConfig.getProperty(KEY_SEMANTIC, Semantic.AT_LEAST_ONCE.name()).toUpperCase()),
			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
	}
{code}

Could someone offer me some advice ? Many thans.

> Making  Semantic configurable in Flinkkafkaproducer to support exactly-once semantic in Table API
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14719
>                 URL: https://issues.apache.org/jira/browse/FLINK-14719
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.8.0
>            Reporter: chaiyongqiang
>            Priority: Major
>
> Flink supports kafka transaction with FlinkKafkaProducer and FlinkKafkaProducer011 . When we use Datastream API , it's able to realize exactly once semantic .  But when we use Table API, things are different. 
> The createKafkaProducer method in KafkaTableSink is used to create FlinkKafkaProducer to sending messages to Kafka server.  It's like :
> {code:java}
> protected SinkFunction<Row> createKafkaProducer(
> 		String topic,
> 		Properties properties,
> 		SerializationSchema<Row> serializationSchema,
> 		Optional<FlinkKafkaPartitioner<Row>> partitioner) {
> 		return new FlinkKafkaProducer<>(
> 			topic,
> 			new KeyedSerializationSchemaWrapper<>(serializationSchema),
> 			properties,
> 			partitioner);
> 	}
> {code}
> when we get into the constructor of FlinkKafkaProducer we can see this will lead to an at_least_once semantic producer :
> {code:java}
> 	public FlinkKafkaProducer(
> 		String defaultTopicId,
> 		KeyedSerializationSchema<IN> serializationSchema,
> 		Properties producerConfig,
> 		Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
> 		this(
> 			defaultTopicId,
> 			serializationSchema,
> 			producerConfig,
> 			customPartitioner,
> 			FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
> 			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> 	}
> {code}
> This makes user could not achieve exactly-once semantic when using Table API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)