You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 悟空 <wu...@foxmail.com> on 2021/08/26 03:31:08 UTC

基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

hi all:&nbsp;
&nbsp; &nbsp; 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现 在一个事务里 先将kafka 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
&nbsp; &nbsp;语句类似这种:
&nbsp; &nbsp;insert into db_table_sink&nbsp;select * from&nbsp; kafka_source_table;
&nbsp; &nbsp;insert into kafka_table_sink select * from kafka_source_table;


&nbsp; 请问flink SQL 有实现方式吗? 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink 程序没有挂掉。

回复: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by "wukong91@foxmail.com" <wu...@foxmail.com>.
我目前大概会采用 Shuo Cheng 提到的, 使用先sink 到Mysql, 再启一个任务 cdc mysql 中的表,这样能保证插入成功后的数据。

我目前使用的是flink 1.12 版本 如果是多端sink 比如 sink db 同时sink kafka ,flink 在sink db 失败,依旧会sink kafka 但是会因为异常,发生tm 重启,会根据自定义重启策略,一直到最后 整个job fail over 掉。



wukong91@foxmail.com
 
发件人: 东东
发送时间: 2021-08-30 16:50
收件人: user-zh
主题: Re:Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
 
 
 
对于Exactly-Once模式下的checkpoint,如果sink端支持两段式事务,应该就可以做到一个sink失败,整个checkpoint失败的。
 
 
不过JDBC sink支持Exactly-Once是1.13.0以后的事情了,建议检查一下你的版本和配置
 
 
 
 
 
 
 
 
 
在 2021-08-30 16:27:24,"wukong91@foxmail.com" <wu...@foxmail.com> 写道:
>Hi: 
> 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑
>
>
>
>wukong91@foxmail.com
> 
>发件人: Shuo Cheng
>发送时间: 2021-08-30 10:19
>收件人: user-zh
>主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
>Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
>表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
>Kafka sink.
> 
>On 8/26/21, jie han <ba...@gmail.com> wrote:
>> HI:
>> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>>
>> 悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
>>
>>> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
>>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
>>> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
>>> 同时下游consumer 使用&nbsp;--isolation-level read_committed
>>> 读取,依旧能成功读取到数据,说明sink
>>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>>
>>>
>>>
>>>
>>> ------------------&nbsp;原始邮件&nbsp;------------------
>>> 发件人:
>>>                                                   "user-zh"
>>>                                                                     <
>>> tsreaper96@gmail.com&gt;;
>>> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
>>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>>
>>> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>>
>>>
>>>
>>> Hi!
>>>
>>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>>> db
>>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>>
>>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>>> Flink CDC connector[1]
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>>>
>>> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>> &gt; 加入的,然后执行execute()方法
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>>> &gt; 发件人:
>>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> "user-zh"
>>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> <
>>> &gt; fskmine@gmail.com&amp;gt;;
>>> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
>>> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
>>> &gt;
>>> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; 说的是 statement set [1] 吗 ?
>>> &gt;
>>> &gt; [1]
>>> &gt;
>>> &gt;
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>>> &gt
>>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
>>> ;
>>> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
>>> &gt;
>>> &gt; &amp;gt; hi all:&amp;amp;nbsp;
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>>> 目前遇到一个问题, 我现在想实现
>>> &gt; 在一个事务里 先将kafka
>>> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
>>> db_table_sink&amp;amp;nbsp;select *
>>> &gt; from&amp;amp;nbsp;
>>> &gt; &amp;gt; kafka_source_table;
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
>>> select * from
>>> &gt; kafka_source_table;
>>> &gt; &amp;gt;
>>> &gt; &amp;gt;
>>> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
>>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>> &gt; 程序没有挂掉。
>>

Re:Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by 东东 <do...@163.com>.


对于Exactly-Once模式下的checkpoint,如果sink端支持两段式事务,应该就可以做到一个sink失败,整个checkpoint失败的。


不过JDBC sink支持Exactly-Once是1.13.0以后的事情了,建议检查一下你的版本和配置









在 2021-08-30 16:27:24,"wukong91@foxmail.com" <wu...@foxmail.com> 写道:
>Hi: 
> 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑
>
>
>
>wukong91@foxmail.com
> 
>发件人: Shuo Cheng
>发送时间: 2021-08-30 10:19
>收件人: user-zh
>主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
>Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
>表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
>Kafka sink.
> 
>On 8/26/21, jie han <ba...@gmail.com> wrote:
>> HI:
>> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>>
>> 悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
>>
>>> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
>>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
>>> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
>>> 同时下游consumer 使用&nbsp;--isolation-level read_committed
>>> 读取,依旧能成功读取到数据,说明sink
>>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>>
>>>
>>>
>>>
>>> ------------------&nbsp;原始邮件&nbsp;------------------
>>> 发件人:
>>>                                                   "user-zh"
>>>                                                                     <
>>> tsreaper96@gmail.com&gt;;
>>> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
>>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>>
>>> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>>
>>>
>>>
>>> Hi!
>>>
>>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>>> db
>>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>>
>>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>>> Flink CDC connector[1]
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>>>
>>> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>> &gt; 加入的,然后执行execute()方法
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>>> &gt; 发件人:
>>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> "user-zh"
>>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>>> <
>>> &gt; fskmine@gmail.com&amp;gt;;
>>> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
>>> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
>>> &gt;
>>> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>> &gt;
>>> &gt;
>>> &gt;
>>> &gt; 说的是 statement set [1] 吗 ?
>>> &gt;
>>> &gt; [1]
>>> &gt;
>>> &gt;
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>>> &gt
>>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
>>> ;
>>> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
>>> &gt;
>>> &gt; &amp;gt; hi all:&amp;amp;nbsp;
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>>> 目前遇到一个问题, 我现在想实现
>>> &gt; 在一个事务里 先将kafka
>>> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
>>> db_table_sink&amp;amp;nbsp;select *
>>> &gt; from&amp;amp;nbsp;
>>> &gt; &amp;gt; kafka_source_table;
>>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
>>> select * from
>>> &gt; kafka_source_table;
>>> &gt; &amp;gt;
>>> &gt; &amp;gt;
>>> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
>>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>> &gt; 程序没有挂掉。
>>

Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by "wukong91@foxmail.com" <wu...@foxmail.com>.
Hi: 
 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑



wukong91@foxmail.com
 
发件人: Shuo Cheng
发送时间: 2021-08-30 10:19
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.
 
On 8/26/21, jie han <ba...@gmail.com> wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用&nbsp;--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:
>>                                                   "user-zh"
>>                                                                     <
>> tsreaper96@gmail.com&gt;;
>> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>
>> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>>
>> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>> &gt; 加入的,然后执行execute()方法
>> &gt;
>> &gt;
>> &gt;
>> &gt;
>> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>> &gt; 发件人:
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "user-zh"
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> <
>> &gt; fskmine@gmail.com&amp;gt;;
>> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
>> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
>> &gt;
>> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> &gt;
>> &gt;
>> &gt;
>> &gt; 说的是 statement set [1] 吗 ?
>> &gt;
>> &gt; [1]
>> &gt;
>> &gt;
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> &gt
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
>> ;
>> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
>> &gt;
>> &gt; &amp;gt; hi all:&amp;amp;nbsp;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>> &gt; 在一个事务里 先将kafka
>> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
>> db_table_sink&amp;amp;nbsp;select *
>> &gt; from&amp;amp;nbsp;
>> &gt; &amp;gt; kafka_source_table;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
>> select * from
>> &gt; kafka_source_table;
>> &gt; &amp;gt;
>> &gt; &amp;gt;
>> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>> &gt; 程序没有挂掉。
>

Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by Shuo Cheng <nj...@gmail.com>.
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.

On 8/26/21, jie han <ba...@gmail.com> wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用&nbsp;--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:
>>                                                   "user-zh"
>>                                                                     <
>> tsreaper96@gmail.com&gt;;
>> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
>> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>>
>> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>>
>> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>> &gt; 加入的,然后执行execute()方法
>> &gt;
>> &gt;
>> &gt;
>> &gt;
>> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
>> &gt; 发件人:
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "user-zh"
>> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> <
>> &gt; fskmine@gmail.com&amp;gt;;
>> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
>> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
>> &gt;
>> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> &gt;
>> &gt;
>> &gt;
>> &gt; 说的是 statement set [1] 吗 ?
>> &gt;
>> &gt; [1]
>> &gt;
>> &gt;
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> &gt
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
>> ;
>> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
>> &gt;
>> &gt; &amp;gt; hi all:&amp;amp;nbsp;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>> &gt; 在一个事务里 先将kafka
>> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
>> db_table_sink&amp;amp;nbsp;select *
>> &gt; from&amp;amp;nbsp;
>> &gt; &amp;gt; kafka_source_table;
>> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
>> select * from
>> &gt; kafka_source_table;
>> &gt; &amp;gt;
>> &gt; &amp;gt;
>> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>> &gt; 程序没有挂掉。
>

Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by "wukong91@foxmail.com" <wu...@foxmail.com>.
能具体说下如何实现吗? 我用cdc 能实现什么,我现在想让两个Insert Sql 保持到一个事务里, 要么全成功,要么全失败,目前查看Flink 文档 并没有发现相关的解释



wukong91@foxmail.com
 
发件人: jie han
发送时间: 2021-08-26 21:36
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
HI:
可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
 
悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:
 
> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
> 同时下游consumer 使用&nbsp;--isolation-level read_committed 读取,依旧能成功读取到数据,说明sink
> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> tsreaper96@gmail.com&gt;;
> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> Hi!
>
> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>
> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
> Flink CDC connector[1]
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>
> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
> &gt; 加入的,然后执行execute()方法
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; fskmine@gmail.com&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
> &gt;
> &gt;
> &gt;
> &gt; 说的是 statement set [1] 吗 ?
> &gt;
> &gt; [1]
> &gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
> &gt
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
> ;
> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
> &gt;
> &gt; &amp;gt; hi all:&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
> 目前遇到一个问题, 我现在想实现
> &gt; 在一个事务里 先将kafka
> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
> db_table_sink&amp;amp;nbsp;select *
> &gt; from&amp;amp;nbsp;
> &gt; &amp;gt; kafka_source_table;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
> select * from
> &gt; kafka_source_table;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
> &gt; 程序没有挂掉。

Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by jie han <ba...@gmail.com>.
HI:
可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀

悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午1:54写道:

> 我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11,
> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
> 但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once',
> 同时下游consumer 使用&nbsp;--isolation-level read_committed 读取,依旧能成功读取到数据,说明sink
> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> tsreaper96@gmail.com&gt;;
> 发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> Hi!
>
> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>
> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
> Flink CDC connector[1]
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:
>
> &gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
> &gt; 加入的,然后执行execute()方法
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "user-zh"
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> <
> &gt; fskmine@gmail.com&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
> &gt;
> &gt;
> &gt;
> &gt; 说的是 statement set [1] 吗 ?
> &gt;
> &gt; [1]
> &gt;
> &gt;
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
> &gt
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements&gt>
> ;
> &gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
> &gt;
> &gt; &amp;gt; hi all:&amp;amp;nbsp;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
> 目前遇到一个问题, 我现在想实现
> &gt; 在一个事务里 先将kafka
> &gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into
> db_table_sink&amp;amp;nbsp;select *
> &gt; from&amp;amp;nbsp;
> &gt; &amp;gt; kafka_source_table;
> &gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink
> select * from
> &gt; kafka_source_table;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗?
> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
> &gt; 程序没有挂掉。

回复: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by 悟空 <wu...@foxmail.com>.
我目前用的是flink-connector-kafka_2.11&nbsp;和&nbsp;flink-connector-jdbc_2.11, 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段, 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。&nbsp;
但是接着sink Kafka 是成功的,Kafka端 我开启了&nbsp;'sink.semantic' = 'exactly-once', 同时下游consumer 使用&nbsp;--isolation-level read_committed 读取,依旧能成功读取到数据,说明sink db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <tsreaper96@gmail.com&gt;;
发送时间:&nbsp;2021年8月26日(星期四) 中午1:25
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中



Hi!

如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?

另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
Flink CDC connector[1]

[1] https://github.com/ververica/flink-cdc-connectors

悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 下午12:52写道:

&gt; 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
&gt; 加入的,然后执行execute()方法
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <
&gt; fskmine@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2021年8月26日(星期四) 中午12:36
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
&gt;
&gt;
&gt;
&gt; 说的是 statement set [1] 吗 ?
&gt;
&gt; [1]
&gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
&gt;
&gt; 悟空 <wukong91@foxmail.com&amp;gt; 于2021年8月26日周四 上午11:33写道:
&gt;
&gt; &amp;gt; hi all:&amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现
&gt; 在一个事务里 先将kafka
&gt; &amp;gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;语句类似这种:
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into db_table_sink&amp;amp;nbsp;select *
&gt; from&amp;amp;nbsp;
&gt; &amp;gt; kafka_source_table;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp;insert into kafka_table_sink select * from
&gt; kafka_source_table;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; 请问flink SQL 有实现方式吗? 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
&gt; 程序没有挂掉。

Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?

另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
Flink CDC connector[1]

[1] https://github.com/ververica/flink-cdc-connectors

悟空 <wu...@foxmail.com> 于2021年8月26日周四 下午12:52写道:

> 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
> 加入的,然后执行execute()方法
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> fskmine@gmail.com&gt;;
> 发送时间:&nbsp;2021年8月26日(星期四) 中午12:36
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> 说的是 statement set [1] 吗 ?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>
> 悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 上午11:33写道:
>
> &gt; hi all:&amp;nbsp;
> &gt; &amp;nbsp; &amp;nbsp; 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现
> 在一个事务里 先将kafka
> &gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
> &gt; &amp;nbsp; &amp;nbsp;语句类似这种:
> &gt; &amp;nbsp; &amp;nbsp;insert into db_table_sink&amp;nbsp;select *
> from&amp;nbsp;
> &gt; kafka_source_table;
> &gt; &amp;nbsp; &amp;nbsp;insert into kafka_table_sink select * from
> kafka_source_table;
> &gt;
> &gt;
> &gt; &amp;nbsp; 请问flink SQL 有实现方式吗? 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
> 程序没有挂掉。

回复: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by 悟空 <wu...@foxmail.com>.
能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql 加入的,然后执行execute()方法




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <fskmine@gmail.com&gt;;
发送时间:&nbsp;2021年8月26日(星期四) 中午12:36
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中



说的是 statement set [1] 吗 ?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements

悟空 <wukong91@foxmail.com&gt; 于2021年8月26日周四 上午11:33写道:

&gt; hi all:&amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现 在一个事务里 先将kafka
&gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
&gt; &amp;nbsp; &amp;nbsp;语句类似这种:
&gt; &amp;nbsp; &amp;nbsp;insert into db_table_sink&amp;nbsp;select * from&amp;nbsp;
&gt; kafka_source_table;
&gt; &amp;nbsp; &amp;nbsp;insert into kafka_table_sink select * from kafka_source_table;
&gt;
&gt;
&gt; &amp;nbsp; 请问flink SQL 有实现方式吗? 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink 程序没有挂掉。

Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

Posted by Shengkai Fang <fs...@gmail.com>.
说的是 statement set [1] 吗 ?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements

悟空 <wu...@foxmail.com> 于2021年8月26日周四 上午11:33写道:

> hi all:&nbsp;
> &nbsp; &nbsp; 我目前基于flink 1.12 sql 来开发功能, 目前遇到一个问题, 我现在想实现 在一个事务里 先将kafka
> 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
> &nbsp; &nbsp;语句类似这种:
> &nbsp; &nbsp;insert into db_table_sink&nbsp;select * from&nbsp;
> kafka_source_table;
> &nbsp; &nbsp;insert into kafka_table_sink select * from kafka_source_table;
>
>
> &nbsp; 请问flink SQL 有实现方式吗? 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink 程序没有挂掉。