You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小学生 <20...@qq.com> on 2020/07/21 08:32:46 UTC

flink table同时作为写出及输入时下游无数据

各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。




from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings



env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;trck_id VARCHAR&nbsp;


) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'gg',&nbsp; &nbsp;&nbsp;
&nbsp;'scan.startup.mode' = 'specific-offsets',&nbsp;&nbsp;
&nbsp;'scan.startup.specific-offsets'='partition:1,offset:0',
&nbsp;'properties.bootstrap.servers' = '****',
&nbsp;'format' = 'json'&nbsp;
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;trck_id VARCHAR&nbsp;
&nbsp;
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
&nbsp;'table-name' = 'g_unit',&nbsp; &nbsp;&nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = 'root',
&nbsp;'sink.buffer-flush.interval' = '1s'&nbsp; &nbsp; &nbsp;
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;&nbsp;
&nbsp;trck_id VARCHAR&nbsp;
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
&nbsp;'table-name' = 'g_summary',&nbsp;&nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = 'root',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""

t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)


sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''



stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)


stmt_set.execute().get_job_client().get_job_execution_result().result()