You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小学霸 <st...@qq.com> on 2020/07/13 09:50:22 UTC

flink 1.11写入mysql问题

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'alarm_test_g',&nbsp; &nbsp;
&nbsp;'scan.startup.mode' = 'earliest-offset', 
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:2181',
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:9092',
&nbsp;'format' = 'json'&nbsp; 
)
"""

sink="""
CREATE TABLE g_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp; 
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp; 
&nbsp;'table-name' = 'g',&nbsp; &nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = '123456t',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
&nbsp; &nbsp; &nbsp; &nbsp; .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

Re: flink 1.11写入mysql问题

Posted by Jark Wu <im...@gmail.com>.
请问你是怎么提交的作业呢? 是在本地 IDEA 里面执行的,还是打成 jar 包后提交到集群运行的呢?

On Mon, 13 Jul 2020 at 17:58, 小学霸 <st...@qq.com> wrote:

> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
> &nbsp;id VARCHAR,&nbsp; &nbsp;
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
>
>
> ) WITH (
> &nbsp;'connector' = 'kafka',
> &nbsp;'topic' = 'alarm_test_g',&nbsp; &nbsp;
> &nbsp;'scan.startup.mode' = 'earliest-offset',
> &nbsp;'properties.bootstrap.servers' = '10.2.2.73:2181',
> &nbsp;'properties.bootstrap.servers' = '10.2.2.73:9092',
> &nbsp;'format' = 'json'&nbsp;
> )
> """
>
> sink="""
> CREATE TABLE g_source_tab (
> &nbsp;id VARCHAR,&nbsp; &nbsp;
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
>
>
> ) WITH (
> &nbsp;'connector' = 'jdbc',
> &nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
> &nbsp;'table-name' = 'g',&nbsp; &nbsp;
> &nbsp;'username' = 'root',
> &nbsp;'password' = '123456t',
> &nbsp;'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
>
>
> source = t_env.from_path("kafka_source_tab")\
> &nbsp; &nbsp; &nbsp; &nbsp; .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")