You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/07/14 07:30:41 UTC
pyFlink 1.11 streaming job example
Hi,
I am trying to get a simple streaming job running in pyFlink and understand
the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source()
and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
execute_sql() in the deprecation warning.
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""
ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)
t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")
This gives me the error :
: java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot generate StreamGraph.
I am aware this is lazily evaluated, so is there some equivalent SQL
statement for t_env.execute() that I should be calling?
Thanks,
Manas
Re: pyFlink 1.11 streaming job example
Posted by Manas Kale <ma...@gmail.com>.
Thank you Xingbo, this will certainly help!
On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <hx...@gmail.com> wrote:
> Hi Manas,
>
> I have created a issue[1] to add related doc
>
> [1] https://issues.apache.org/jira/browse/FLINK-18598
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午4:15写道:
>
>> Thank you for the quick reply Xingbo!
>> Is there some documented webpage example that I can refer to in the
>> future for the latest pyFlink 1.11 API? I couldn't find anything related to
>> awaiting asynchronous results.
>>
>> Thanks,
>> Manas
>>
>> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:
>>
>>> Hi Manas,
>>>
>>>
>>> I tested your code, but there are no errors. Because execute_sql is an
>>> asynchronous method, you need to await through TableResult, you can try the
>>> following code:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment,
>>> TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>>
>>>
>>> def test():
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>> )
>>>
>>> INPUT_TABLE = "test"
>>> INPUT_TOPIC = "test"
>>> LOCAL_KAFKA = "localhost:2181"
>>> OUTPUT_TABLE = "test_output"
>>> OUTPUT_TOPIC = "test_output"
>>> ddl_source = f"""
>>> CREATE TABLE {INPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `time_st` TIMESTAMP(3),
>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>> `data` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{INPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'properties.group.id' = 'myGroup',
>>> 'format' = 'json'
>>> )
>>> """
>>>
>>> ddl_sink = f"""
>>> CREATE TABLE {OUTPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `max` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{OUTPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'format' = 'json'
>>> )
>>> """
>>> t_env.execute_sql(ddl_source)
>>> t_env.execute_sql(ddl_sink)
>>>
>>> result = t_env.execute_sql(f"""
>>> INSERT INTO {OUTPUT_TABLE}
>>> SELECT monitorId, data
>>> FROM {INPUT_TABLE}
>>> """)
>>> result.get_job_client().get_job_execution_result().result()
>>>
>>>
>>> if __name__ == '__main__':
>>> test()
>>>
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>>
>>>> Hi,
>>>> I am trying to get a simple streaming job running in pyFlink and
>>>> understand the new 1.11 API. I just want to read from and write to kafka
>>>> topics.
>>>> Previously I was using t_env.execute("jobname"),
>>>> register_table_source() and register_table_sink() but in 1.11 all 3 were
>>>> deprecated and replaced by execute_sql() in the deprecation warning.
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>> )
>>>>
>>>> ddl_source = f"""
>>>> CREATE TABLE {INPUT_TABLE} (
>>>> `monitorId` VARCHAR,
>>>> `time_st` TIMESTAMP(3),
>>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>> `data` DOUBLE
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = '{INPUT_TOPIC}',
>>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>> 'properties.group.id' = 'myGroup',
>>>> 'format' = 'json'
>>>> )
>>>> """
>>>>
>>>> ddl_sink = f"""
>>>> CREATE TABLE {OUTPUT_TABLE} (
>>>> `monitorId` VARCHAR,
>>>> `max` DOUBLE
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = '{OUTPUT_TOPIC}',
>>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>> 'format' = 'json'
>>>> )
>>>> """
>>>> t_env.execute_sql(ddl_source)
>>>> t_env.execute_sql(ddl_sink)
>>>>
>>>> t_env.execute_sql(f"""
>>>> INSERT INTO {OUTPUT_TABLE}
>>>> SELECT monitorId, data
>>>> FROM {INPUT_TABLE}
>>>> """)
>>>>
>>>>
>>>> This gives me the error :
>>>> : java.lang.IllegalStateException: No operators defined in streaming
>>>> topology. Cannot generate StreamGraph.
>>>>
>>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>>> statement for t_env.execute() that I should be calling?
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>
Re: pyFlink 1.11 streaming job example
Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,
I have created a issue[1] to add related doc
[1] https://issues.apache.org/jira/browse/FLINK-18598
Best,
Xingbo
Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午4:15写道:
> Thank you for the quick reply Xingbo!
> Is there some documented webpage example that I can refer to in the
> future for the latest pyFlink 1.11 API? I couldn't find anything related to
> awaiting asynchronous results.
>
> Thanks,
> Manas
>
> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi Manas,
>>
>>
>> I tested your code, but there are no errors. Because execute_sql is an
>> asynchronous method, you need to await through TableResult, you can try the
>> following code:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment,
>> TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>
>>
>> def test():
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>> )
>>
>> INPUT_TABLE = "test"
>> INPUT_TOPIC = "test"
>> LOCAL_KAFKA = "localhost:2181"
>> OUTPUT_TABLE = "test_output"
>> OUTPUT_TOPIC = "test_output"
>> ddl_source = f"""
>> CREATE TABLE {INPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `time_st` TIMESTAMP(3),
>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>> `data` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{INPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'properties.group.id' = 'myGroup',
>> 'format' = 'json'
>> )
>> """
>>
>> ddl_sink = f"""
>> CREATE TABLE {OUTPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `max` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{OUTPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'format' = 'json'
>> )
>> """
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> result = t_env.execute_sql(f"""
>> INSERT INTO {OUTPUT_TABLE}
>> SELECT monitorId, data
>> FROM {INPUT_TABLE}
>> """)
>> result.get_job_client().get_job_execution_result().result()
>>
>>
>> if __name__ == '__main__':
>> test()
>>
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>
>>> Hi,
>>> I am trying to get a simple streaming job running in pyFlink and
>>> understand the new 1.11 API. I just want to read from and write to kafka
>>> topics.
>>> Previously I was using t_env.execute("jobname"), register_table_source()
>>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>>> execute_sql() in the deprecation warning.
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>> )
>>>
>>> ddl_source = f"""
>>> CREATE TABLE {INPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `time_st` TIMESTAMP(3),
>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>> `data` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{INPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'properties.group.id' = 'myGroup',
>>> 'format' = 'json'
>>> )
>>> """
>>>
>>> ddl_sink = f"""
>>> CREATE TABLE {OUTPUT_TABLE} (
>>> `monitorId` VARCHAR,
>>> `max` DOUBLE
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{OUTPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>> 'format' = 'json'
>>> )
>>> """
>>> t_env.execute_sql(ddl_source)
>>> t_env.execute_sql(ddl_sink)
>>>
>>> t_env.execute_sql(f"""
>>> INSERT INTO {OUTPUT_TABLE}
>>> SELECT monitorId, data
>>> FROM {INPUT_TABLE}
>>> """)
>>>
>>>
>>> This gives me the error :
>>> : java.lang.IllegalStateException: No operators defined in streaming
>>> topology. Cannot generate StreamGraph.
>>>
>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>> statement for t_env.execute() that I should be calling?
>>>
>>> Thanks,
>>> Manas
>>>
>>
Re: pyFlink 1.11 streaming job example
Posted by Manas Kale <ma...@gmail.com>.
Thank you for the quick reply Xingbo!
Is there some documented webpage example that I can refer to in the future
for the latest pyFlink 1.11 API? I couldn't find anything related to
awaiting asynchronous results.
Thanks,
Manas
On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hx...@gmail.com> wrote:
> Hi Manas,
>
>
> I tested your code, but there are no errors. Because execute_sql is an
> asynchronous method, you need to await through TableResult, you can try the
> following code:
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
> def test():
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
> )
>
> INPUT_TABLE = "test"
> INPUT_TOPIC = "test"
> LOCAL_KAFKA = "localhost:2181"
> OUTPUT_TABLE = "test_output"
> OUTPUT_TOPIC = "test_output"
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` VARCHAR,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
> `data` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'properties.group.id' = 'myGroup',
> 'format' = 'json'
> )
> """
>
> ddl_sink = f"""
> CREATE TABLE {OUTPUT_TABLE} (
> `monitorId` VARCHAR,
> `max` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{OUTPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_sink)
>
> result = t_env.execute_sql(f"""
> INSERT INTO {OUTPUT_TABLE}
> SELECT monitorId, data
> FROM {INPUT_TABLE}
> """)
> result.get_job_client().get_job_execution_result().result()
>
>
> if __name__ == '__main__':
> test()
>
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>
>> Hi,
>> I am trying to get a simple streaming job running in pyFlink and
>> understand the new 1.11 API. I just want to read from and write to kafka
>> topics.
>> Previously I was using t_env.execute("jobname"), register_table_source()
>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>> execute_sql() in the deprecation warning.
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>> )
>>
>> ddl_source = f"""
>> CREATE TABLE {INPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `time_st` TIMESTAMP(3),
>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>> `data` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{INPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'properties.group.id' = 'myGroup',
>> 'format' = 'json'
>> )
>> """
>>
>> ddl_sink = f"""
>> CREATE TABLE {OUTPUT_TABLE} (
>> `monitorId` VARCHAR,
>> `max` DOUBLE
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = '{OUTPUT_TOPIC}',
>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>> 'format' = 'json'
>> )
>> """
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> t_env.execute_sql(f"""
>> INSERT INTO {OUTPUT_TABLE}
>> SELECT monitorId, data
>> FROM {INPUT_TABLE}
>> """)
>>
>>
>> This gives me the error :
>> : java.lang.IllegalStateException: No operators defined in streaming
>> topology. Cannot generate StreamGraph.
>>
>> I am aware this is lazily evaluated, so is there some equivalent SQL
>> statement for t_env.execute() that I should be calling?
>>
>> Thanks,
>> Manas
>>
>
Re: pyFlink 1.11 streaming job example
Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,
I tested your code, but there are no errors. Because execute_sql is an
asynchronous method, you need to await through TableResult, you can try the
following code:
from pyflink.datastream import StreamExecutionEnvironment,
TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def test():
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
INPUT_TABLE = "test"
INPUT_TOPIC = "test"
LOCAL_KAFKA = "localhost:2181"
OUTPUT_TABLE = "test_output"
OUTPUT_TOPIC = "test_output"
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` VARCHAR,
`time_st` TIMESTAMP(3),
WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
`data` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'properties.group.id' = 'myGroup',
'format' = 'json'
)
"""
ddl_sink = f"""
CREATE TABLE {OUTPUT_TABLE} (
`monitorId` VARCHAR,
`max` DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""
t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)
result = t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT monitorId, data
FROM {INPUT_TABLE}
""")
result.get_job_client().get_job_execution_result().result()
if __name__ == '__main__':
test()
Best,
Xingbo
Manas Kale <ma...@gmail.com> 于2020年7月14日周二 下午3:31写道:
> Hi,
> I am trying to get a simple streaming job running in pyFlink and
> understand the new 1.11 API. I just want to read from and write to kafka
> topics.
> Previously I was using t_env.execute("jobname"), register_table_source()
> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
> execute_sql() in the deprecation warning.
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
> )
>
> ddl_source = f"""
> CREATE TABLE {INPUT_TABLE} (
> `monitorId` VARCHAR,
> `time_st` TIMESTAMP(3),
> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
> `data` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{INPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'properties.group.id' = 'myGroup',
> 'format' = 'json'
> )
> """
>
> ddl_sink = f"""
> CREATE TABLE {OUTPUT_TABLE} (
> `monitorId` VARCHAR,
> `max` DOUBLE
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '{OUTPUT_TOPIC}',
> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
> 'format' = 'json'
> )
> """
> t_env.execute_sql(ddl_source)
> t_env.execute_sql(ddl_sink)
>
> t_env.execute_sql(f"""
> INSERT INTO {OUTPUT_TABLE}
> SELECT monitorId, data
> FROM {INPUT_TABLE}
> """)
>
>
> This gives me the error :
> : java.lang.IllegalStateException: No operators defined in streaming
> topology. Cannot generate StreamGraph.
>
> I am aware this is lazily evaluated, so is there some equivalent SQL
> statement for t_env.execute() that I should be calling?
>
> Thanks,
> Manas
>