You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by lgs <99...@qq.com> on 2020/07/21 07:34:29 UTC
flink1.11 pyflink stream job 退出
python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:
s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
st_env = StreamTableEnvironment.create(s_env,
environment_settings=b_s_settings)
configuration = st_env.get_config().get_configuration()
configuration.set_string("taskmanager.memory.task.off-heap.size",
"500m")
s_env.set_parallelism(1)
kafka_source = """CREATE TABLE source (
flow_name STRING,
flow_id STRING,
component STRING,
filename STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'cep',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)"""
postgres_sink = """
CREATE TABLE cep_result (
`filename` STRING,
`start_tstamp` TIMESTAMP(3),
`end_tstamp` TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
'connector.table' = 'cep_result',
'connector.driver' = 'org.postgresql.Driver',
'connector.username' = 'postgres',
'connector.password' = 'my_password',
'connector.write.flush.max-rows' = '1'
)
"""
st_env.sql_update(kafka_source)
st_env.sql_update(postgres_sink)
postgres_sink_sql = '''
INSERT INTO cep_result
SELECT *
FROM source
MATCH_RECOGNIZE (
PARTITION BY filename
ORDER BY event_time
MEASURES
(A.event_time) AS start_tstamp,
(D.event_time) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B C D)
DEFINE
A AS component = 'XXX',
B AS component = 'YYY',
C AS component = 'ZZZ',
D AS component = 'WWW'
) MR
'''
sql_result = st_env.execute_sql(postgres_sink_sql)
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11 pyflink stream job 退出
Posted by Xingbo Huang <hx...@gmail.com>.
是的,execute是1.10及以前使用的,execute_sql是1.11之后推荐使用的
Best,
Xingbo
lgs <99...@qq.com> 于2020年7月21日周二 下午3:57写道:
> 谢谢。加上后就可以了。
>
> 改成原来的sql_update然后st_env.execute("job")好像也可以。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: flink1.11 pyflink stream job 退出
Posted by lgs <99...@qq.com>.
谢谢。加上后就可以了。
改成原来的sql_update然后st_env.execute("job")好像也可以。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11 pyflink stream job 退出
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上
sql_result.get_job_client().get_job_execution_result().result()
对此我已经创建了JIRA[1]
[1] https://issues.apache.org/jira/browse/FLINK-18598
Best,
Xingbo
lgs <99...@qq.com> 于2020年7月21日周二 下午3:35写道:
> python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
> 代码如下,使用了MATCH_RECOGNIZE:
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> b_s_settings =
>
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> st_env = StreamTableEnvironment.create(s_env,
> environment_settings=b_s_settings)
> configuration = st_env.get_config().get_configuration()
> configuration.set_string("taskmanager.memory.task.off-heap.size",
> "500m")
>
> s_env.set_parallelism(1)
>
> kafka_source = """CREATE TABLE source (
> flow_name STRING,
> flow_id STRING,
> component STRING,
> filename STRING,
> event_time TIMESTAMP(3),
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'cep',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )"""
>
>
>
> postgres_sink = """
> CREATE TABLE cep_result (
> `filename` STRING,
> `start_tstamp` TIMESTAMP(3),
> `end_tstamp` TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
> 'connector.table' = 'cep_result',
> 'connector.driver' = 'org.postgresql.Driver',
> 'connector.username' = 'postgres',
> 'connector.password' = 'my_password',
> 'connector.write.flush.max-rows' = '1'
> )
> """
>
> st_env.sql_update(kafka_source)
> st_env.sql_update(postgres_sink)
>
> postgres_sink_sql = '''
> INSERT INTO cep_result
> SELECT *
> FROM source
> MATCH_RECOGNIZE (
> PARTITION BY filename
> ORDER BY event_time
> MEASURES
> (A.event_time) AS start_tstamp,
> (D.event_time) AS end_tstamp
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B C D)
> DEFINE
> A AS component = 'XXX',
> B AS component = 'YYY',
> C AS component = 'ZZZ',
> D AS component = 'WWW'
> ) MR
> '''
>
> sql_result = st_env.execute_sql(postgres_sink_sql)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>