You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by lgs <99...@qq.com> on 2020/07/21 07:34:29 UTC

flink1.11 pyflink stream job 退出

python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:

    s_env = StreamExecutionEnvironment.get_execution_environment()
    b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    st_env = StreamTableEnvironment.create(s_env,
environment_settings=b_s_settings)
    configuration = st_env.get_config().get_configuration()
    configuration.set_string("taskmanager.memory.task.off-heap.size",
"500m")

    s_env.set_parallelism(1)

    kafka_source = """CREATE TABLE source (
         flow_name STRING,
         flow_id STRING,
         component STRING,
         filename STRING,
         event_time TIMESTAMP(3),
         WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
         'connector' = 'kafka',
         'topic' = 'cep',
         'properties.bootstrap.servers' = 'localhost:9092',
         'format' = 'json',
         'scan.startup.mode' = 'latest-offset'
        )"""



    postgres_sink = """
        CREATE TABLE cep_result (
        `filename`         STRING,
        `start_tstamp`          TIMESTAMP(3),
        `end_tstamp`           TIMESTAMP(3)
        ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
        'connector.table' = 'cep_result',
        'connector.driver' = 'org.postgresql.Driver',
        'connector.username' = 'postgres',
        'connector.password' = 'my_password',
        'connector.write.flush.max-rows' = '1'
        )
        """

    st_env.sql_update(kafka_source)
    st_env.sql_update(postgres_sink)

    postgres_sink_sql = '''
        INSERT INTO cep_result
        SELECT *
        FROM source
            MATCH_RECOGNIZE (
                PARTITION BY filename
                ORDER BY event_time
                MEASURES
                    (A.event_time) AS start_tstamp,
                    (D.event_time) AS end_tstamp
                ONE ROW PER MATCH
                AFTER MATCH SKIP PAST LAST ROW
                PATTERN (A B C D)
                DEFINE
                    A AS component = 'XXX',
                    B AS component = 'YYY',
                    C AS component = 'ZZZ',
                    D AS component = 'WWW'
            ) MR
    '''

    sql_result = st_env.execute_sql(postgres_sink_sql)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 pyflink stream job 退出

Posted by Xingbo Huang <hx...@gmail.com>.
是的,execute是1.10及以前使用的,execute_sql是1.11之后推荐使用的

Best,
Xingbo

lgs <99...@qq.com> 于2020年7月21日周二 下午3:57写道:

> 谢谢。加上后就可以了。
>
> 改成原来的sql_update然后st_env.execute("job")好像也可以。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: flink1.11 pyflink stream job 退出

Posted by lgs <99...@qq.com>.
谢谢。加上后就可以了。

改成原来的sql_update然后st_env.execute("job")好像也可以。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 pyflink stream job 退出

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上
sql_result.get_job_client().get_job_execution_result().result()
对此我已经创建了JIRA[1]

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

lgs <99...@qq.com> 于2020年7月21日周二 下午3:35写道:

> python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
> 代码如下,使用了MATCH_RECOGNIZE:
>
>     s_env = StreamExecutionEnvironment.get_execution_environment()
>     b_s_settings =
>
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
>     st_env = StreamTableEnvironment.create(s_env,
> environment_settings=b_s_settings)
>     configuration = st_env.get_config().get_configuration()
>     configuration.set_string("taskmanager.memory.task.off-heap.size",
> "500m")
>
>     s_env.set_parallelism(1)
>
>     kafka_source = """CREATE TABLE source (
>          flow_name STRING,
>          flow_id STRING,
>          component STRING,
>          filename STRING,
>          event_time TIMESTAMP(3),
>          WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
>         ) WITH (
>          'connector' = 'kafka',
>          'topic' = 'cep',
>          'properties.bootstrap.servers' = 'localhost:9092',
>          'format' = 'json',
>          'scan.startup.mode' = 'latest-offset'
>         )"""
>
>
>
>     postgres_sink = """
>         CREATE TABLE cep_result (
>         `filename`         STRING,
>         `start_tstamp`          TIMESTAMP(3),
>         `end_tstamp`           TIMESTAMP(3)
>         ) WITH (
>         'connector.type' = 'jdbc',
>         'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
>         'connector.table' = 'cep_result',
>         'connector.driver' = 'org.postgresql.Driver',
>         'connector.username' = 'postgres',
>         'connector.password' = 'my_password',
>         'connector.write.flush.max-rows' = '1'
>         )
>         """
>
>     st_env.sql_update(kafka_source)
>     st_env.sql_update(postgres_sink)
>
>     postgres_sink_sql = '''
>         INSERT INTO cep_result
>         SELECT *
>         FROM source
>             MATCH_RECOGNIZE (
>                 PARTITION BY filename
>                 ORDER BY event_time
>                 MEASURES
>                     (A.event_time) AS start_tstamp,
>                     (D.event_time) AS end_tstamp
>                 ONE ROW PER MATCH
>                 AFTER MATCH SKIP PAST LAST ROW
>                 PATTERN (A B C D)
>                 DEFINE
>                     A AS component = 'XXX',
>                     B AS component = 'YYY',
>                     C AS component = 'ZZZ',
>                     D AS component = 'WWW'
>             ) MR
>     '''
>
>     sql_result = st_env.execute_sql(postgres_sink_sql)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>