You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小学生 <20...@qq.com> on 2020/07/24 10:18:57 UTC

flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
&nbsp;id VARCHAR,&nbsp;
&nbsp;alarm_id VARCHAR,
&nbsp;trck_id VARCHAR
) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'test',&nbsp;&nbsp;
&nbsp;'properties.bootstrap.servers' = '*',
&nbsp;'properties.group.id' = 'flink_grouper',
&nbsp;'scan.startup.mode' = 'earliest-offset',&nbsp; &nbsp;&nbsp;
&nbsp;'format' = 'json',
&nbsp;'json.fail-on-missing-field' = 'false',
&nbsp;'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
&nbsp;id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
&nbsp;trck_id VARCHAR&nbsp; &nbsp;&nbsp;
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
&nbsp;'driver' = 'com.mysql.cj.jdbc.Driver',
&nbsp;'table-name' = 'detail',
&nbsp;'username' = 'root',
&nbsp;'password' = 'root',
&nbsp;'sink.buffer-flush.max-rows' = '1000',
&nbsp;'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()

回复: flink1.11查询结果每秒入库到mysql数量很少

Posted by "chengyanan1008@foxmail.com" <ch...@foxmail.com>.
Hello:
    我拿你的代码做测试,也是flink1.11.0版本,除了字段修改了一下(要和我的kafka数据对应),其他没有任何变动,速度还是比较快的
    代码全部都是你的代码,一开始mysql没有数据是因为你的“sink.buffer-flush.max-rows”设置为1000了,
    大约过了几秒之后,mysql里直接就写进去1000条记录,所以你的代码是没有问题的
    建议你看一下应用日志或者mysql



chengyanan1008@foxmail.com
 
发件人: 小学生
发送时间: 2020-07-24 18:18
收件人: user-zh
主题: flink1.11查询结果每秒入库到mysql数量很少
各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。
 
 
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
&nbsp;id VARCHAR,&nbsp;
&nbsp;alarm_id VARCHAR,
&nbsp;trck_id VARCHAR
) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'test',&nbsp;&nbsp;
&nbsp;'properties.bootstrap.servers' = '*',
&nbsp;'properties.group.id' = 'flink_grouper',
&nbsp;'scan.startup.mode' = 'earliest-offset',&nbsp; &nbsp;&nbsp;
&nbsp;'format' = 'json',
&nbsp;'json.fail-on-missing-field' = 'false',
&nbsp;'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
&nbsp;id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
&nbsp;trck_id VARCHAR&nbsp; &nbsp;&nbsp;
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
&nbsp;'driver' = 'com.mysql.cj.jdbc.Driver',
&nbsp;'table-name' = 'detail',
&nbsp;'username' = 'root',
&nbsp;'password' = 'root',
&nbsp;'sink.buffer-flush.max-rows' = '1000',
&nbsp;'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()

Re:Re: flink1.11查询结果每秒入库到mysql数量很少

Posted by 小学生 <20...@qq.com>.
额,这个我对比过了,相同的数据自己写的Python程序去插入的话,每秒写入mysql的记录有150左右;mysql应该没有瓶颈的,再者这个里面没加索引

Re:Re: flink1.11查询结果每秒入库到mysql数量很少

Posted by RS <ti...@163.com>.
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题?

或者你手动模拟执行下SQL写数据对比下速度?














在 2020-07-25 10:20:35,"小学生" <20...@qq.com> 写道:
>您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。

Re: flink1.11查询结果每秒入库到mysql数量很少

Posted by 小学生 <20...@qq.com>.
您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。

Re: flink1.11查询结果每秒入库到mysql数量很少

Posted by WeiXubin <18...@163.com>.
Hi,
你可以尝试改写url,加上rewritebatchedstatements=true,如下:
jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL
Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true,
驱动才会帮你批量执行SQL。

祝好
weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink1.11查询结果每秒入库到mysql数量很少

Posted by 咿咿呀呀 <20...@qq.com>.
您好,您讲的应用日志或者mysql 这个具体怎么排查呢,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/