You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/07/14 07:30:41 UTC

pyFlink 1.11 streaming job example

Hi,
I am trying to get a simple streaming job running in pyFlink and understand
the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source()
and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
execute_sql() in the deprecation warning.

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                      )

ddl_source = f"""
    CREATE TABLE {INPUT_TABLE} (
        `monitorId` VARCHAR,
        `time_st` TIMESTAMP(3),
        WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
        `data` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{INPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'properties.group.id' = 'myGroup',
        'format' = 'json'
    )
"""

ddl_sink = f"""
    CREATE TABLE {OUTPUT_TABLE} (
        `monitorId` VARCHAR,
        `max` DOUBLE
    ) WITH (
        'connector' = 'kafka',
        'topic' = '{OUTPUT_TOPIC}',
        'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
        'format' = 'json'
    )
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT monitorId, data
    FROM {INPUT_TABLE}
""")


This gives me the error :
: java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.

I am aware this is lazily evaluated, so is there some equivalent SQL
statement for t_env.execute() that I should be calling?

Thanks,
Manas

Re: pyFlink 1.11 streaming job example

Posted by Manas Kale <ma...@gmail.com>.
Thank you Xingbo, this will certainly help!

On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Manas,
>
> I have created a issue[1] to add related doc
>
> [1] https://issues.apache.org/jira/browse/FLINK-18598
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午4:15写道:
>
>> Thank you for the quick reply Xingbo!
>>  Is there some documented webpage example that I can refer to in the
>> future for the latest pyFlink 1.11 API? I couldn't find anything related to
>> awaiting asynchronous results.
>>
>> Thanks,
>> Manas
>>
>> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:
>>
>>> Hi Manas,
>>>
>>>
>>> I tested your code, but there are no errors. Because execute_sql is an
>>> asynchronous method, you need to await through TableResult, you can try the
>>> following code:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment,
>>> TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>>
>>>
>>> def test():
>>>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>     t_env = StreamTableEnvironment.create(exec_env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                           )
>>>
>>>     INPUT_TABLE = "test"
>>>     INPUT_TOPIC = "test"
>>>     LOCAL_KAFKA = "localhost:2181"
>>>     OUTPUT_TABLE = "test_output"
>>>     OUTPUT_TOPIC = "test_output"
>>>     ddl_source = f"""
>>>         CREATE TABLE {INPUT_TABLE} (
>>>             `monitorId` VARCHAR,
>>>             `time_st` TIMESTAMP(3),
>>>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>             `data` DOUBLE
>>>         ) WITH (
>>>             'connector' = 'kafka',
>>>             'topic' = '{INPUT_TOPIC}',
>>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>             'properties.group.id' = 'myGroup',
>>>             'format' = 'json'
>>>         )
>>>     """
>>>
>>>     ddl_sink = f"""
>>>         CREATE TABLE {OUTPUT_TABLE} (
>>>             `monitorId` VARCHAR,
>>>             `max` DOUBLE
>>>         ) WITH (
>>>             'connector' = 'kafka',
>>>             'topic' = '{OUTPUT_TOPIC}',
>>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>             'format' = 'json'
>>>         )
>>>     """
>>>     t_env.execute_sql(ddl_source)
>>>     t_env.execute_sql(ddl_sink)
>>>
>>>     result = t_env.execute_sql(f"""
>>>         INSERT INTO {OUTPUT_TABLE}
>>>         SELECT monitorId, data
>>>         FROM {INPUT_TABLE}
>>>     """)
>>>     result.get_job_client().get_job_execution_result().result()
>>>
>>>
>>> if __name__ == '__main__':
>>>     test()
>>>
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>>
>>>> Hi,
>>>> I am trying to get a simple streaming job running in pyFlink and
>>>> understand the new 1.11 API. I just want to read from and write to kafka
>>>> topics.
>>>> Previously I was using t_env.execute("jobname"),
>>>> register_table_source() and register_table_sink() but in 1.11 all 3 were
>>>> deprecated and replaced by execute_sql() in the deprecation warning.
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>>                                       )
>>>>
>>>> ddl_source = f"""
>>>>     CREATE TABLE {INPUT_TABLE} (
>>>>         `monitorId` VARCHAR,
>>>>         `time_st` TIMESTAMP(3),
>>>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>>         `data` DOUBLE
>>>>     ) WITH (
>>>>         'connector' = 'kafka',
>>>>         'topic' = '{INPUT_TOPIC}',
>>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>>         'properties.group.id' = 'myGroup',
>>>>         'format' = 'json'
>>>>     )
>>>> """
>>>>
>>>> ddl_sink = f"""
>>>>     CREATE TABLE {OUTPUT_TABLE} (
>>>>         `monitorId` VARCHAR,
>>>>         `max` DOUBLE
>>>>     ) WITH (
>>>>         'connector' = 'kafka',
>>>>         'topic' = '{OUTPUT_TOPIC}',
>>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>>         'format' = 'json'
>>>>     )
>>>> """
>>>> t_env.execute_sql(ddl_source)
>>>> t_env.execute_sql(ddl_sink)
>>>>
>>>> t_env.execute_sql(f"""
>>>>     INSERT INTO {OUTPUT_TABLE}
>>>>     SELECT monitorId, data
>>>>     FROM {INPUT_TABLE}
>>>> """)
>>>>
>>>>
>>>> This gives me the error :
>>>> : java.lang.IllegalStateException: No operators defined in streaming
>>>> topology. Cannot generate StreamGraph.
>>>>
>>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>>> statement for t_env.execute() that I should be calling?
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>

Re: pyFlink 1.11 streaming job example

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,

I have created a issue[1] to add related doc

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午4:15写道:

> Thank you for the quick reply Xingbo!
>  Is there some documented webpage example that I can refer to in the
> future for the latest pyFlink 1.11 API? I couldn't find anything related to
> awaiting asynchronous results.
>
> Thanks,
> Manas
>
> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi Manas,
>>
>>
>> I tested your code, but there are no errors. Because execute_sql is an
>> asynchronous method, you need to await through TableResult, you can try the
>> following code:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment,
>> TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>
>>
>> def test():
>>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>     t_env = StreamTableEnvironment.create(exec_env,
>>
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                           )
>>
>>     INPUT_TABLE = "test"
>>     INPUT_TOPIC = "test"
>>     LOCAL_KAFKA = "localhost:2181"
>>     OUTPUT_TABLE = "test_output"
>>     OUTPUT_TOPIC = "test_output"
>>     ddl_source = f"""
>>         CREATE TABLE {INPUT_TABLE} (
>>             `monitorId` VARCHAR,
>>             `time_st` TIMESTAMP(3),
>>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>             `data` DOUBLE
>>         ) WITH (
>>             'connector' = 'kafka',
>>             'topic' = '{INPUT_TOPIC}',
>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>             'properties.group.id' = 'myGroup',
>>             'format' = 'json'
>>         )
>>     """
>>
>>     ddl_sink = f"""
>>         CREATE TABLE {OUTPUT_TABLE} (
>>             `monitorId` VARCHAR,
>>             `max` DOUBLE
>>         ) WITH (
>>             'connector' = 'kafka',
>>             'topic' = '{OUTPUT_TOPIC}',
>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>             'format' = 'json'
>>         )
>>     """
>>     t_env.execute_sql(ddl_source)
>>     t_env.execute_sql(ddl_sink)
>>
>>     result = t_env.execute_sql(f"""
>>         INSERT INTO {OUTPUT_TABLE}
>>         SELECT monitorId, data
>>         FROM {INPUT_TABLE}
>>     """)
>>     result.get_job_client().get_job_execution_result().result()
>>
>>
>> if __name__ == '__main__':
>>     test()
>>
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>
>>> Hi,
>>> I am trying to get a simple streaming job running in pyFlink and
>>> understand the new 1.11 API. I just want to read from and write to kafka
>>> topics.
>>> Previously I was using t_env.execute("jobname"), register_table_source()
>>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>>> execute_sql() in the deprecation warning.
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                       )
>>>
>>> ddl_source = f"""
>>>     CREATE TABLE {INPUT_TABLE} (
>>>         `monitorId` VARCHAR,
>>>         `time_st` TIMESTAMP(3),
>>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>         `data` DOUBLE
>>>     ) WITH (
>>>         'connector' = 'kafka',
>>>         'topic' = '{INPUT_TOPIC}',
>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>         'properties.group.id' = 'myGroup',
>>>         'format' = 'json'
>>>     )
>>> """
>>>
>>> ddl_sink = f"""
>>>     CREATE TABLE {OUTPUT_TABLE} (
>>>         `monitorId` VARCHAR,
>>>         `max` DOUBLE
>>>     ) WITH (
>>>         'connector' = 'kafka',
>>>         'topic' = '{OUTPUT_TOPIC}',
>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>         'format' = 'json'
>>>     )
>>> """
>>> t_env.execute_sql(ddl_source)
>>> t_env.execute_sql(ddl_sink)
>>>
>>> t_env.execute_sql(f"""
>>>     INSERT INTO {OUTPUT_TABLE}
>>>     SELECT monitorId, data
>>>     FROM {INPUT_TABLE}
>>> """)
>>>
>>>
>>> This gives me the error :
>>> : java.lang.IllegalStateException: No operators defined in streaming
>>> topology. Cannot generate StreamGraph.
>>>
>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>> statement for t_env.execute() that I should be calling?
>>>
>>> Thanks,
>>> Manas
>>>
>>

Re: pyFlink 1.11 streaming job example

Posted by Manas Kale <ma...@gmail.com>.
Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future
for the latest pyFlink 1.11 API? I couldn't find anything related to
awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Manas,
>
>
> I tested your code, but there are no errors. Because execute_sql is an
> asynchronous method, you need to await through TableResult, you can try the
> following code:
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
> def test():
>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     t_env = StreamTableEnvironment.create(exec_env,
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>                                           )
>
>     INPUT_TABLE = "test"
>     INPUT_TOPIC = "test"
>     LOCAL_KAFKA = "localhost:2181"
>     OUTPUT_TABLE = "test_output"
>     OUTPUT_TOPIC = "test_output"
>     ddl_source = f"""
>         CREATE TABLE {INPUT_TABLE} (
>             `monitorId` VARCHAR,
>             `time_st` TIMESTAMP(3),
>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>             `data` DOUBLE
>         ) WITH (
>             'connector' = 'kafka',
>             'topic' = '{INPUT_TOPIC}',
>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>             'properties.group.id' = 'myGroup',
>             'format' = 'json'
>         )
>     """
>
>     ddl_sink = f"""
>         CREATE TABLE {OUTPUT_TABLE} (
>             `monitorId` VARCHAR,
>             `max` DOUBLE
>         ) WITH (
>             'connector' = 'kafka',
>             'topic' = '{OUTPUT_TOPIC}',
>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>             'format' = 'json'
>         )
>     """
>     t_env.execute_sql(ddl_source)
>     t_env.execute_sql(ddl_sink)
>
>     result = t_env.execute_sql(f"""
>         INSERT INTO {OUTPUT_TABLE}
>         SELECT monitorId, data
>         FROM {INPUT_TABLE}
>     """)
>     result.get_job_client().get_job_execution_result().result()
>
>
> if __name__ == '__main__':
>     test()
>
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>
>> Hi,
>> I am trying to get a simple streaming job running in pyFlink and
>> understand the new 1.11 API. I just want to read from and write to kafka
>> topics.
>> Previously I was using t_env.execute("jobname"), register_table_source()
>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>> execute_sql() in the deprecation warning.
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                       )
>>
>> ddl_source = f"""
>>     CREATE TABLE {INPUT_TABLE} (
>>         `monitorId` VARCHAR,
>>         `time_st` TIMESTAMP(3),
>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>         `data` DOUBLE
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{INPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'properties.group.id' = 'myGroup',
>>         'format' = 'json'
>>     )
>> """
>>
>> ddl_sink = f"""
>>     CREATE TABLE {OUTPUT_TABLE} (
>>         `monitorId` VARCHAR,
>>         `max` DOUBLE
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{OUTPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'format' = 'json'
>>     )
>> """
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> t_env.execute_sql(f"""
>>     INSERT INTO {OUTPUT_TABLE}
>>     SELECT monitorId, data
>>     FROM {INPUT_TABLE}
>> """)
>>
>>
>> This gives me the error :
>> : java.lang.IllegalStateException: No operators defined in streaming
>> topology. Cannot generate StreamGraph.
>>
>> I am aware this is lazily evaluated, so is there some equivalent SQL
>> statement for t_env.execute() that I should be calling?
>>
>> Thanks,
>> Manas
>>
>

Re: pyFlink 1.11 streaming job example

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,


I tested your code, but there are no errors. Because execute_sql is an
asynchronous method, you need to await through TableResult, you can try the
following code:


from pyflink.datastream import StreamExecutionEnvironment,
TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def test():
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    t_env = StreamTableEnvironment.create(exec_env,

environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
                                          )

    INPUT_TABLE = "test"
    INPUT_TOPIC = "test"
    LOCAL_KAFKA = "localhost:2181"
    OUTPUT_TABLE = "test_output"
    OUTPUT_TOPIC = "test_output"
    ddl_source = f"""
        CREATE TABLE {INPUT_TABLE} (
            `monitorId` VARCHAR,
            `time_st` TIMESTAMP(3),
            WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
            `data` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{INPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'properties.group.id' = 'myGroup',
            'format' = 'json'
        )
    """

    ddl_sink = f"""
        CREATE TABLE {OUTPUT_TABLE} (
            `monitorId` VARCHAR,
            `max` DOUBLE
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{OUTPUT_TOPIC}',
            'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
            'format' = 'json'
        )
    """
    t_env.execute_sql(ddl_source)
    t_env.execute_sql(ddl_sink)

    result = t_env.execute_sql(f"""
        INSERT INTO {OUTPUT_TABLE}
        SELECT monitorId, data
        FROM {INPUT_TABLE}
    """)
    result.get_job_client().get_job_execution_result().result()


if __name__ == '__main__':
    test()


Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:

> Hi,
> I am trying to get a simple streaming job running in pyFlink and
> understand the new 1.11 API. I just want to read from and write to kafka
> topics.
> Previously I was using t_env.execute("jobname"), register_table_source()
> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
> execute_sql() in the deprecation warning.
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>                                       environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>                                       )
>
> ddl_source = f"""
>     CREATE TABLE {INPUT_TABLE} (
>         `monitorId` VARCHAR,
>         `time_st` TIMESTAMP(3),
>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>         `data` DOUBLE
>     ) WITH (
>         'connector' = 'kafka',
>         'topic' = '{INPUT_TOPIC}',
>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>         'properties.group.id' = 'myGroup',
>         'format' = 'json'
>     )
> """
>
> ddl_sink = f"""
>     CREATE TABLE {OUTPUT_TABLE} (
>         `monitorId` VARCHAR,
>         `max` DOUBLE
>     ) WITH (
>         'connector' = 'kafka',
>         'topic' = '{OUTPUT_TOPIC}',
>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>         'format' = 'json'
>     )
> """
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_sink)
>
> t_env.execute_sql(f"""
>     INSERT INTO {OUTPUT_TABLE}
>     SELECT monitorId, data
>     FROM {INPUT_TABLE}
> """)
>
>
> This gives me the error :
> : java.lang.IllegalStateException: No operators defined in streaming
> topology. Cannot generate StreamGraph.
>
> I am aware this is lazily evaluated, so is there some equivalent SQL
> statement for t_env.execute() that I should be calling?
>
> Thanks,
> Manas
>