You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by nicygan <re...@163.com> on 2020/06/18 09:04:55 UTC
flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据
dear all:
我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("123456")
.setQuery(sql2)
.setParameterTypes(types)
.setBatchSize(1000)
.build();
=========== 问题 ================
如果上游数据来源时间是:
10:00 -> 900条
10:10 -> 120条
11:50 -> 1100条
15:00 -> 900条
JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
10:10 -> 写入1000条,剩20条下次写入
11:50 -> 写入1000条,剩30条下次写入
15:00 -> 写入1000条,剩10条下次写入
我想要达到等待20分种,不满足batchSize也写入,能否实现?
10:10 -> 写入1000条,剩20条下次写入
10:30 -> 写入20条
11:50 -> 写入1000条,剩10条下次写入
12:10 -> 写入10条
15:20 -> 写入900条
thanks
Re:Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据
Posted by nicygan <re...@163.com>.
请问timeout值是多少?在哪里可设置?
在 2020-06-18 17:43:31,"Benchao Li" <li...@apache.org> 写道:
>我理解现在就是你想要的效果。
>batch-size和timeout两个条件是达到一个就会flush的。
>
>nicygan <re...@163.com> 于2020年6月18日周四 下午5:05写道:
>
>> dear all:
>> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>>
>>
>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>> .setDrivername("com.mysql.jdbc.Driver")
>> .setDBUrl("jdbc:mysql://localhost:3306/flink")
>> .setUsername("root")
>> .setPassword("123456")
>> .setQuery(sql2)
>> .setParameterTypes(types)
>> .setBatchSize(1000)
>> .build();
>>
>> =========== 问题 ================
>> 如果上游数据来源时间是:
>> 10:00 -> 900条
>> 10:10 -> 120条
>> 11:50 -> 1100条
>> 15:00 -> 900条
>>
>> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
>> 10:10 -> 写入1000条,剩20条下次写入
>> 11:50 -> 写入1000条,剩30条下次写入
>> 15:00 -> 写入1000条,剩10条下次写入
>>
>> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
>> 10:10 -> 写入1000条,剩20条下次写入
>> 10:30 -> 写入20条
>> 11:50 -> 写入1000条,剩10条下次写入
>> 12:10 -> 写入10条
>> 15:20 -> 写入900条
>>
>> thanks
>>
>
>
>--
>
>Best,
>Benchao Li
Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据
Posted by Benchao Li <li...@apache.org>.
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。
nicygan <re...@163.com> 于2020年6月18日周四 下午5:05写道:
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("jdbc:mysql://localhost:3306/flink")
> .setUsername("root")
> .setPassword("123456")
> .setQuery(sql2)
> .setParameterTypes(types)
> .setBatchSize(1000)
> .build();
>
> =========== 问题 ================
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks
>
--
Best,
Benchao Li
Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据
Posted by Leonard Xu <xb...@gmail.com>.
Hello
1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。
1.10.0 && 1.10.1 文档[1],对应的两个参数:
'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records),
-- over this number of records, will flush data. The default value is "5000".
'connector.write.flush.interval' = '2s', --optional, flush interval mills, over this time, asynchronous threads will flush data.
如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。
社区即将发布的1.11.0 文档[2], 对应的两个参数:
sink.buffer-flush.max-rows -- The max size of buffered records before flush. Can be set to zero to disable it.
sink.buffer-flush.interval -- The flush interval mills, over this time, asynchronous threads will flush data.
-- Can be set to '0' to disable it. Note, 'sink.buffer-flush.max-rows' can be set to '0'
-- with the flush interval set allowing for complete async processing of buffered actions.
Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options>
> 在 2020年6月18日,17:04,nicygan <re...@163.com> 写道:
>
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("jdbc:mysql://localhost:3306/flink")
> .setUsername("root")
> .setPassword("123456")
> .setQuery(sql2)
> .setParameterTypes(types)
> .setBatchSize(1000)
> .build();
>
> =========== 问题 ================
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks