You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by a <80...@qq.com.INVALID> on 2021/10/25 16:17:30 UTC

flink写mysql问题

各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗

Re: flink写mysql问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Flink 1.11 对 jdbc 在流作业中的支持确实不完善,在流作业做 checkpoint 时没有处理。如果需要在流作业中使用 jdbc
sink,建议升级到比较新的 1.13 或 1.14。

zya <z_...@foxmail.com> 于2021年10月26日周二 下午4:56写道:

> 你好,感谢回复
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢?
>
>
> 我用的是1.11.2版本的flink
> sql,我发现数据写到外部直接使用的是BufferReduceStatementExecutor中的方法,同时在做检查点的时候不会触发到数据库的flush,好像没有使用到类GenericJdbcSinkFunction
> 那么如果遇到断电等问题,这部分数据是不是会丢失呢
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> tsreaper96@gmail.com&gt;;
> 发送时间:&nbsp;2021年10月26日(星期二) 上午10:31
> 收件人:&nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: flink写mysql问题
>
>
>
> Hi!
>
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢
>
>
> JdbcDynamicTableSink 不包含具体 sink function
> 的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
> jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。
>
> 写 mysql 的 qps 只能到几百,反压严重
>
>
> jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
> 就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。
>
> 算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成
>
>
> checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
> 也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows
>
> a <806040308@qq.com.invalid&gt; 于2021年10月26日周二 上午9:49写道:
>
> &gt;
> &gt;
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
> &gt;
> &gt;
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗

回复: flink写mysql问题

Posted by zya <z_...@foxmail.com>.
你好,感谢回复
在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢?


我用的是1.11.2版本的flink sql,我发现数据写到外部直接使用的是BufferReduceStatementExecutor中的方法,同时在做检查点的时候不会触发到数据库的flush,好像没有使用到类GenericJdbcSinkFunction
那么如果遇到断电等问题,这部分数据是不是会丢失呢




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <tsreaper96@gmail.com&gt;;
发送时间:&nbsp;2021年10月26日(星期二) 上午10:31
收件人:&nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink写mysql问题



Hi!

在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢


JdbcDynamicTableSink 不包含具体 sink function
的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。

写 mysql 的 qps 只能到几百,反压严重


jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。

算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成


checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows

a <806040308@qq.com.invalid&gt; 于2021年10月26日周二 上午9:49写道:

&gt;
&gt; 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
&gt;
&gt; 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗

Re: flink写mysql问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢


JdbcDynamicTableSink 不包含具体 sink function
的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。

写 mysql 的 qps 只能到几百,反压严重


jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。

算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成


checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows

a <80...@qq.com.invalid> 于2021年10月26日周二 上午9:49写道:

>
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
>
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗