You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 静谧雨寒 <fr...@vip.qq.com> on 2020/07/01 07:41:12 UTC

回复: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

感谢,看了下issues, Fix Version/s:None ,不知何时才能加上,还是老老实实用dataStream


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"方盛凯"<fskmine@gmail.com&gt;;
发送时间:&nbsp;2020年7月1日(星期三) 下午3:31
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"夏帅"<jkillers@dingtalk.com&gt;;

主题:&nbsp;Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?



我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221

夏帅 <jkillers@dingtalk.com.invalid&gt; 于2020年7月1日周三 下午3:13写道:

&gt; 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
&gt;
&gt; Kafka011TableSink
&gt;
&gt;
&gt; @Override
&gt; protected SinkFunction<Row&gt; createKafkaProducer(
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String topic,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; SerializationSchema<Row&gt; serializationSchema,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Optional<FlinkKafkaPartitioner<Row&gt;&gt; partitioner) {
&gt;&nbsp;&nbsp;&nbsp; return new FlinkKafkaProducer011<&gt;(
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; topic,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new KeyedSerializationSchemaWrapper<&gt;(serializationSchema),
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; properties,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; partitioner,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 5);
&gt; }
&gt; 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase
&gt;
&gt; 参考:
&gt; https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
&gt; ------------------------------------------------------------------
&gt; 发件人:静谧雨寒 <freedom0083@vip.qq.com&gt;
&gt; 发送时间:2020年7月1日(星期三) 14:33
&gt; 收件人:user-zh <user-zh@flink.apache.org&gt;
&gt; 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
&gt;
&gt; &amp;nbsp;flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql
&gt; sink表使用两阶事务提交,exactly-once一致性保证 ?
&gt; 官档说法:
&gt; Consistency guarantees: By default, a Kafka sink ingests data with
&gt; at-least-once guarantees into a Kafka topic if the query is executed with
&gt; checkpointing enabled.,&amp;nbsp;&amp;nbsp;
&gt; CREATE TABLE 默认是 at-least-once
&gt;
&gt;