You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Etienne Chauchot <ec...@apache.org> on 2022/11/07 14:29:54 UTC

[blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:

https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

Any feedback on the interest of the API benchmark article below ?

Best

Etienne

Le 09/11/2022 à 12:18, Etienne Chauchot a écrit :
>
> Hi,
>
> And by the way, I was planing on writing another article to compare 
> the performances of DataSet, DataStream and SQL APIs over TPCDS 
> query3. I thought that I could run the pipelines on an Amazon EMR 
> cluster with different data sizes 1GB, 100GB, 1TB.
>
> Would it be worth it, what do you think ?
>
> Best
>
> Etienne
>
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>>
>> Hi Yun Gao,
>>
>> thanks for your email and your review !
>>
>> My comments are inline
>>
>> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>>> Hi Etienne,
>>>
>>> Very thanks for the article! Flink is currently indeed keeping 
>>> increasing the
>>> ability of unified batch / stream processing with the same api, and 
>>> its a great
>>> pleasure that more and more users are trying this functionality. But 
>>> I also
>>> have some questions regarding some details.
>>>
>>> First IMO, as a whole for the long run Flink will have two unified 
>>> APIs, namely Table / SQL
>>> API and DataStream API. Users could express the computation logic 
>>> with these two APIs
>>> for both bounded and unbounded data processing.
>>
>>
>> Yes that is what I understood also throughout the discussions and 
>> jiras. And I also think IMHO that reducing the number of APIs to 2 
>> was the good move.
>>
>>
>>> Underlying Flink provides two
>>> execution modes:  the streaming mode works with both bounded and 
>>> unbounded data,
>>> and it executes in a way of incremental processing based on state; 
>>> the batch mode works
>>> only with bounded data, and it executes in a ways level-by-level 
>>> similar to the traditional
>>> batch processing frameworks. Users could switch the execution mode via
>>> EnvironmentSettings.inBatchMode() for 
>>> StreamExecutionEnvironment.setRuntimeMode().
>>
>> As recommended in Flink docs(1) I have enabled the batch mode as I 
>> though it would be more efficient on my bounded pipeline but as a 
>> matter of fact the streaming mode seems to be more efficient on my 
>> use case. I'll test with higher volumes to confirm.
>>
>>
>>>
>>> Specially for DataStream, as implemented in FLIP-140, currently all 
>>> the existing DataStream
>>> operation supports the batch execution mode in a unified way[1]:  
>>> data will be sorted for the
>>> keyBy() edges according to the key, then the following operations 
>>> like reduce() could receive
>>> all the data belonging to the same key consecutively, then it could 
>>> directly reducing the records
>>> of the same key without maintaining the intermediate states. In this 
>>> way users could write the
>>> same code for both streaming and batch processing with the same code.
>>
>>
>> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
>> pipeline will work with no modification if I plug an unbounded source 
>> to it.
>>
>>>
>>>
>>> # Regarding the migration of Join / Reduce
>>>
>>> First I think Reduce is always supported and users could write 
>>> dataStream.keyBy().reduce(xx)
>>> directly, and  if batch execution mode is set, the reduce will not 
>>> be executed in a incremental way,
>>> instead is acts much  like sort-based  aggregation in the 
>>> traditional batch processing framework.
>>>
>>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>>> current join has to be bound
>>> to a window and the GlobalWindow does not work properly, but with 
>>> some more try currently
>>> it does not need users to re-write the whole join from scratch: 
>>> Users could write a dedicated
>>> window assigner that assigns all the  records to the same window 
>>> instance  and return
>>> EventTimeTrigger.create() as the default event-time trigger [2]. 
>>> Then it works
>>>
>>> source1.join(source2)
>>>                 .where(a -> a.f0)
>>>                 .equalTo(b -> b.f0)
>>>                 .window(new EndOfStreamWindows())
>>>                 .apply(xxxx);
>>>
>>> It does not requires records have event-time attached since the 
>>> trigger of window is only
>>> relying on the time range of the window and the assignment does not 
>>> need event-time either.
>>>
>>> The behavior of the join is also similar to sort-based join if batch 
>>> mode is enabled.
>>>
>>> Of course it is not easy to use to let users do the workaround and 
>>> we'll try to fix this issue in 1.17.
>>
>>
>> Yes, this is a better workaround than the manual state-based join 
>> that I proposed. I tried it and it works perfectly with similar 
>> performance. Thanks.
>>
>>>
>>> # Regarding support of Sort / Limit
>>>
>>> Currently these two operators are indeed not supported in the 
>>> DataStream API directly. One initial
>>> though for these two operations are that users may convert the 
>>> DataStream to Table API and use
>>> Table API for these two operators:
>>>
>>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>>> Table tableXX = tableEnv.fromDataStream(dataStream);
>>> tableXX.orderBy($("a").asc());
>>
>>
>> Yes I knew that workaround but I decided not to use it because I have 
>> a special SQL based implementation (for comparison reasons) so I did 
>> not want to mix SQL and DataStream APIs in the same pipeline.
>>
>>>
>>> How do you think about this option? We are also assessing if the 
>>> combination of DataStream
>>> API / Table API is sufficient for all the batch users. Any 
>>> suggestions are warmly welcome.
>>
>>
>> I guess that outside of my use case of comparing the performance of 
>> the 3 Flink APIs (broader subject than this article), users can 
>> easily mix the APIs in the same pipeline. If we really want to have 
>> these operations in the DataStream API maybe wrapping state-based 
>> implementations could be good if their performance meets our 
>> expectations.
>>
>>>
>>> Best,
>>> Yun Gao
>>
>> I'll update the article and the code with your suggestions. Thanks again.
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>>
>>
>> Best
>>
>> Etienne
>>
>>>
>>>
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2] 
>>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>>
>>>
>>>
>>>     ------------------------------------------------------------------
>>>     From:liu ron <ro...@gmail.com>
>>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>>     pipeline from the DataSet API to the DataStream API
>>>
>>>     Thanks for your post, It looks very good to me, also maybe for
>>>     developers,
>>>
>>>     Best,
>>>     Liudalong
>>>
>>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>>     Wow, cool!  Thanks for your work.
>>>     It'll be definitely helpful for the users that want to migrate
>>>     their batch job from DataSet API to DataStream API.
>>>
>>>     Best regards,
>>>     Yuxia
>>>
>>>     ----- 原始邮件 -----
>>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>>     the DataSet API to the DataStream API
>>>
>>>     Hi everyone,
>>>
>>>     In case some of you are interested, I just posted a blog article
>>>     about
>>>     migrating a real-life batch pipeline from the DataSet API to the
>>>     DataStream API:
>>>
>>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>>
>>>     Best
>>>
>>>     Etienne
>>>
>>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

Any feedback on the interest of the API benchmark article below ?

Best

Etienne

Le 09/11/2022 à 12:18, Etienne Chauchot a écrit :
>
> Hi,
>
> And by the way, I was planing on writing another article to compare 
> the performances of DataSet, DataStream and SQL APIs over TPCDS 
> query3. I thought that I could run the pipelines on an Amazon EMR 
> cluster with different data sizes 1GB, 100GB, 1TB.
>
> Would it be worth it, what do you think ?
>
> Best
>
> Etienne
>
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>>
>> Hi Yun Gao,
>>
>> thanks for your email and your review !
>>
>> My comments are inline
>>
>> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>>> Hi Etienne,
>>>
>>> Very thanks for the article! Flink is currently indeed keeping 
>>> increasing the
>>> ability of unified batch / stream processing with the same api, and 
>>> its a great
>>> pleasure that more and more users are trying this functionality. But 
>>> I also
>>> have some questions regarding some details.
>>>
>>> First IMO, as a whole for the long run Flink will have two unified 
>>> APIs, namely Table / SQL
>>> API and DataStream API. Users could express the computation logic 
>>> with these two APIs
>>> for both bounded and unbounded data processing.
>>
>>
>> Yes that is what I understood also throughout the discussions and 
>> jiras. And I also think IMHO that reducing the number of APIs to 2 
>> was the good move.
>>
>>
>>> Underlying Flink provides two
>>> execution modes:  the streaming mode works with both bounded and 
>>> unbounded data,
>>> and it executes in a way of incremental processing based on state; 
>>> the batch mode works
>>> only with bounded data, and it executes in a ways level-by-level 
>>> similar to the traditional
>>> batch processing frameworks. Users could switch the execution mode via
>>> EnvironmentSettings.inBatchMode() for 
>>> StreamExecutionEnvironment.setRuntimeMode().
>>
>> As recommended in Flink docs(1) I have enabled the batch mode as I 
>> though it would be more efficient on my bounded pipeline but as a 
>> matter of fact the streaming mode seems to be more efficient on my 
>> use case. I'll test with higher volumes to confirm.
>>
>>
>>>
>>> Specially for DataStream, as implemented in FLIP-140, currently all 
>>> the existing DataStream
>>> operation supports the batch execution mode in a unified way[1]:  
>>> data will be sorted for the
>>> keyBy() edges according to the key, then the following operations 
>>> like reduce() could receive
>>> all the data belonging to the same key consecutively, then it could 
>>> directly reducing the records
>>> of the same key without maintaining the intermediate states. In this 
>>> way users could write the
>>> same code for both streaming and batch processing with the same code.
>>
>>
>> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
>> pipeline will work with no modification if I plug an unbounded source 
>> to it.
>>
>>>
>>>
>>> # Regarding the migration of Join / Reduce
>>>
>>> First I think Reduce is always supported and users could write 
>>> dataStream.keyBy().reduce(xx)
>>> directly, and  if batch execution mode is set, the reduce will not 
>>> be executed in a incremental way,
>>> instead is acts much  like sort-based  aggregation in the 
>>> traditional batch processing framework.
>>>
>>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>>> current join has to be bound
>>> to a window and the GlobalWindow does not work properly, but with 
>>> some more try currently
>>> it does not need users to re-write the whole join from scratch: 
>>> Users could write a dedicated
>>> window assigner that assigns all the  records to the same window 
>>> instance  and return
>>> EventTimeTrigger.create() as the default event-time trigger [2]. 
>>> Then it works
>>>
>>> source1.join(source2)
>>>                 .where(a -> a.f0)
>>>                 .equalTo(b -> b.f0)
>>>                 .window(new EndOfStreamWindows())
>>>                 .apply(xxxx);
>>>
>>> It does not requires records have event-time attached since the 
>>> trigger of window is only
>>> relying on the time range of the window and the assignment does not 
>>> need event-time either.
>>>
>>> The behavior of the join is also similar to sort-based join if batch 
>>> mode is enabled.
>>>
>>> Of course it is not easy to use to let users do the workaround and 
>>> we'll try to fix this issue in 1.17.
>>
>>
>> Yes, this is a better workaround than the manual state-based join 
>> that I proposed. I tried it and it works perfectly with similar 
>> performance. Thanks.
>>
>>>
>>> # Regarding support of Sort / Limit
>>>
>>> Currently these two operators are indeed not supported in the 
>>> DataStream API directly. One initial
>>> though for these two operations are that users may convert the 
>>> DataStream to Table API and use
>>> Table API for these two operators:
>>>
>>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>>> Table tableXX = tableEnv.fromDataStream(dataStream);
>>> tableXX.orderBy($("a").asc());
>>
>>
>> Yes I knew that workaround but I decided not to use it because I have 
>> a special SQL based implementation (for comparison reasons) so I did 
>> not want to mix SQL and DataStream APIs in the same pipeline.
>>
>>>
>>> How do you think about this option? We are also assessing if the 
>>> combination of DataStream
>>> API / Table API is sufficient for all the batch users. Any 
>>> suggestions are warmly welcome.
>>
>>
>> I guess that outside of my use case of comparing the performance of 
>> the 3 Flink APIs (broader subject than this article), users can 
>> easily mix the APIs in the same pipeline. If we really want to have 
>> these operations in the DataStream API maybe wrapping state-based 
>> implementations could be good if their performance meets our 
>> expectations.
>>
>>>
>>> Best,
>>> Yun Gao
>>
>> I'll update the article and the code with your suggestions. Thanks again.
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>>
>>
>> Best
>>
>> Etienne
>>
>>>
>>>
>>> [1] 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2] 
>>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>>
>>>
>>>
>>>     ------------------------------------------------------------------
>>>     From:liu ron <ro...@gmail.com>
>>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>>     pipeline from the DataSet API to the DataStream API
>>>
>>>     Thanks for your post, It looks very good to me, also maybe for
>>>     developers,
>>>
>>>     Best,
>>>     Liudalong
>>>
>>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>>     Wow, cool!  Thanks for your work.
>>>     It'll be definitely helpful for the users that want to migrate
>>>     their batch job from DataSet API to DataStream API.
>>>
>>>     Best regards,
>>>     Yuxia
>>>
>>>     ----- 原始邮件 -----
>>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>>     the DataSet API to the DataStream API
>>>
>>>     Hi everyone,
>>>
>>>     In case some of you are interested, I just posted a blog article
>>>     about
>>>     migrating a real-life batch pipeline from the DataSet API to the
>>>     DataStream API:
>>>
>>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>>
>>>     Best
>>>
>>>     Etienne
>>>
>>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

And by the way, I was planing on writing another article to compare the 
performances of DataSet, DataStream and SQL APIs over TPCDS query3. I 
thought that I could run the pipelines on an Amazon EMR cluster with 
different data sizes 1GB, 100GB, 1TB.

Would it be worth it, what do you think ?

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
>
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>> Hi Etienne,
>>
>> Very thanks for the article! Flink is currently indeed keeping 
>> increasing the
>> ability of unified batch / stream processing with the same api, and 
>> its a great
>> pleasure that more and more users are trying this functionality. But 
>> I also
>> have some questions regarding some details.
>>
>> First IMO, as a whole for the long run Flink will have two unified 
>> APIs, namely Table / SQL
>> API and DataStream API. Users could express the computation logic 
>> with these two APIs
>> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and 
> jiras. And I also think IMHO that reducing the number of APIs to 2 was 
> the good move.
>
>
>> Underlying Flink provides two
>> execution modes:  the streaming mode works with both bounded and 
>> unbounded data,
>> and it executes in a way of incremental processing based on state; 
>> the batch mode works
>> only with bounded data, and it executes in a ways level-by-level 
>> similar to the traditional
>> batch processing frameworks. Users could switch the execution mode via
>> EnvironmentSettings.inBatchMode() for 
>> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I 
> though it would be more efficient on my bounded pipeline but as a 
> matter of fact the streaming mode seems to be more efficient on my use 
> case. I'll test with higher volumes to confirm.
>
>
>>
>> Specially for DataStream, as implemented in FLIP-140, currently all 
>> the existing DataStream
>> operation supports the batch execution mode in a unified way[1]:  
>> data will be sorted for the
>> keyBy() edges according to the key, then the following operations 
>> like reduce() could receive
>> all the data belonging to the same key consecutively, then it could 
>> directly reducing the records
>> of the same key without maintaining the intermediate states. In this 
>> way users could write the
>> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
> pipeline will work with no modification if I plug an unbounded source 
> to it.
>
>>
>>
>> # Regarding the migration of Join / Reduce
>>
>> First I think Reduce is always supported and users could write 
>> dataStream.keyBy().reduce(xx)
>> directly, and  if batch  execution mode is set, the reduce will not 
>> be executed in a incremental way,
>> instead is acts much  like sort-based  aggregation in the traditional 
>> batch processing framework.
>>
>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>> current join has to be bound
>> to a window and the GlobalWindow does not work properly, but with 
>> some more try currently
>> it does not need users to  re-write the whole join from scratch: 
>> Users could write a dedicated
>> window assigner that assigns all the  records to the same window 
>> instance  and return
>> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
>> it works
>>
>> source1.join(source2)
>>                 .where(a -> a.f0)
>>                 .equalTo(b -> b.f0)
>>                 .window(new EndOfStreamWindows())
>>                 .apply(xxxx);
>>
>> It does not requires records have event-time attached since the 
>> trigger of window is only
>> relying on the time range of the window and the assignment does not 
>> need event-time either.
>>
>> The behavior of the join is also similar to sort-based join if batch 
>> mode is enabled.
>>
>> Of course it is not easy to use to let users do the workaround and 
>> we'll try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that 
> I proposed. I tried it and it works perfectly with similar 
> performance. Thanks.
>
>>
>> # Regarding support of Sort / Limit
>>
>> Currently these two operators are indeed not supported in the 
>> DataStream API directly. One initial
>> though for these two operations are that users may convert the 
>> DataStream to Table API and use
>> Table API for these two operators:
>>
>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>> Table tableXX = tableEnv.fromDataStream(dataStream);
>> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have 
> a special SQL based implementation (for comparison reasons) so I did 
> not want to mix SQL and DataStream APIs in the same pipeline.
>
>>
>> How do you think about this option? We are also assessing if the 
>> combination of DataStream
>> API / Table API is sufficient for all the batch users. Any 
>> suggestions are warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of 
> the 3 Flink APIs (broader subject than this article), users can easily 
> mix the APIs in the same pipeline. If we really want to have these 
> operations in the DataStream API maybe wrapping state-based 
> implementations could be good if their performance meets our expectations.
>
>>
>> Best,
>> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] 
>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>
>>
>>
>>     ------------------------------------------------------------------
>>     From:liu ron <ro...@gmail.com>
>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>     pipeline from the DataSet API to the DataStream API
>>
>>     Thanks for your post, It looks very good to me, also maybe for
>>     developers,
>>
>>     Best,
>>     Liudalong
>>
>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>     Wow, cool!  Thanks for your work.
>>     It'll be definitely helpful for the users that want to migrate
>>     their batch job from DataSet API to DataStream API.
>>
>>     Best regards,
>>     Yuxia
>>
>>     ----- 原始邮件 -----
>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>     the DataSet API to the DataStream API
>>
>>     Hi everyone,
>>
>>     In case some of you are interested, I just posted a blog article
>>     about
>>     migrating a real-life batch pipeline from the DataSet API to the
>>     DataStream API:
>>
>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>
>>     Best
>>
>>     Etienne
>>
>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Etienne,

Nice blog! Thanks for sharing!

Best regards,
Jing


On Wed, Nov 9, 2022 at 5:49 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi Yun Gao,
>
> FYI I just updated the article after your review:
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping increasing
> the
> ability of unified batch / stream processing with the same api, and its a
> great
> pleasure that more and more users are trying this functionality. But I also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified APIs,
> namely Table / SQL
> API and DataStream API. Users could express the computation logic with
> these two APIs
> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and jiras.
> And I also think IMHO that reducing the number of APIs to 2 was the good
> move.
>
>
> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and unbounded
> data,
> and it executes in a way of incremental processing based on state; the
> batch mode works
> only with bounded data, and it executes in a ways level-by-level similar
> to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for
> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I though
> it would be more efficient on my bounded pipeline but as a matter of fact
> the streaming mode seems to be more efficient on my use case. I'll test
> with higher volumes to confirm.
>
>
>
> Specially for DataStream, as implemented in FLIP-140, currently all the
> existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data
> will be sorted for the
> keyBy() edges according to the key, then the following operations like
> reduce() could receive
> all the data belonging to the same key consecutively, then it could
> directly reducing the records
> of the same key without maintaining the intermediate states. In this way
> users could write the
> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream pipeline
> will work with no modification if I plug an unbounded source to it.
>
>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: current
> join has to be bound
> to a window and the GlobalWindow does not work properly, but with some
> more try currently
> it does not need users to  re-write the whole join from scratch: Users
> could write a dedicated
> window assigner that assigns all the  records to the same window instance
> and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then it
> works
>
> source1.join(source2)
>                 .where(a -> a.f0)
>                 .equalTo(b -> b.f0)
>                 .window(new EndOfStreamWindows())
>                 .apply(xxxx);
>
> It does not requires records have event-time attached since the trigger of
> window is only
> relying on the time range of the window and the assignment does not need
> event-time either.
>
> The behavior of the join is also similar to sort-based join if batch mode
> is enabled.
>
> Of course it is not easy to use to let users do the workaround and we'll
> try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that I
> proposed. I tried it and it works perfectly with similar performance.
> Thanks.
>
>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the DataStream
> API directly. One initial
> though for these two operations are that users may convert the DataStream
> to Table API and use
> Table API for these two operators:
>
> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have a
> special SQL based implementation (for comparison reasons) so I did not want
> to mix SQL and DataStream APIs in the same pipeline.
>
>
> How do you think about this option? We are also assessing if the
> combination of DataStream
> API / Table API is sufficient for all the batch users. Any suggestions are
> warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of the 3
> Flink APIs (broader subject than this article), users can easily mix the
> APIs in the same pipeline. If we really want to have these operations in
> the DataStream API maybe wrapping state-based implementations could be good
> if their performance meets our expectations.
>
>
>
> Best,
> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>
>
>
> ------------------------------------------------------------------
> From:liu ron <ro...@gmail.com> <ro...@gmail.com>
> Send Time:2022 Nov. 8 (Tue.) 10:21
> To:dev <de...@flink.apache.org> <de...@flink.apache.org>; Etienne Chauchot
> <ec...@apache.org> <ec...@apache.org>; user
> <us...@flink.apache.org> <us...@flink.apache.org>
> Subject:Re: [blog article] Howto migrate a real-life batch pipeline from
> the DataSet API to the DataStream API
>
> Thanks for your post, It looks very good to me, also maybe for developers,
>
> Best,
> Liudalong
>
> yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Etienne Chauchot" <ec...@apache.org>
> 收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>
>
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Jing Ge <ji...@ververica.com>.
Hi Etienne,

Nice blog! Thanks for sharing!

Best regards,
Jing


On Wed, Nov 9, 2022 at 5:49 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi Yun Gao,
>
> FYI I just updated the article after your review:
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
> Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping increasing
> the
> ability of unified batch / stream processing with the same api, and its a
> great
> pleasure that more and more users are trying this functionality. But I also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified APIs,
> namely Table / SQL
> API and DataStream API. Users could express the computation logic with
> these two APIs
> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and jiras.
> And I also think IMHO that reducing the number of APIs to 2 was the good
> move.
>
>
> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and unbounded
> data,
> and it executes in a way of incremental processing based on state; the
> batch mode works
> only with bounded data, and it executes in a ways level-by-level similar
> to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for
> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I though
> it would be more efficient on my bounded pipeline but as a matter of fact
> the streaming mode seems to be more efficient on my use case. I'll test
> with higher volumes to confirm.
>
>
>
> Specially for DataStream, as implemented in FLIP-140, currently all the
> existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data
> will be sorted for the
> keyBy() edges according to the key, then the following operations like
> reduce() could receive
> all the data belonging to the same key consecutively, then it could
> directly reducing the records
> of the same key without maintaining the intermediate states. In this way
> users could write the
> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream pipeline
> will work with no modification if I plug an unbounded source to it.
>
>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: current
> join has to be bound
> to a window and the GlobalWindow does not work properly, but with some
> more try currently
> it does not need users to  re-write the whole join from scratch: Users
> could write a dedicated
> window assigner that assigns all the  records to the same window instance
> and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then it
> works
>
> source1.join(source2)
>                 .where(a -> a.f0)
>                 .equalTo(b -> b.f0)
>                 .window(new EndOfStreamWindows())
>                 .apply(xxxx);
>
> It does not requires records have event-time attached since the trigger of
> window is only
> relying on the time range of the window and the assignment does not need
> event-time either.
>
> The behavior of the join is also similar to sort-based join if batch mode
> is enabled.
>
> Of course it is not easy to use to let users do the workaround and we'll
> try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that I
> proposed. I tried it and it works perfectly with similar performance.
> Thanks.
>
>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the DataStream
> API directly. One initial
> though for these two operations are that users may convert the DataStream
> to Table API and use
> Table API for these two operators:
>
> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have a
> special SQL based implementation (for comparison reasons) so I did not want
> to mix SQL and DataStream APIs in the same pipeline.
>
>
> How do you think about this option? We are also assessing if the
> combination of DataStream
> API / Table API is sufficient for all the batch users. Any suggestions are
> warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of the 3
> Flink APIs (broader subject than this article), users can easily mix the
> APIs in the same pipeline. If we really want to have these operations in
> the DataStream API maybe wrapping state-based implementations could be good
> if their performance meets our expectations.
>
>
>
> Best,
> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>
>
>
> ------------------------------------------------------------------
> From:liu ron <ro...@gmail.com> <ro...@gmail.com>
> Send Time:2022 Nov. 8 (Tue.) 10:21
> To:dev <de...@flink.apache.org> <de...@flink.apache.org>; Etienne Chauchot
> <ec...@apache.org> <ec...@apache.org>; user
> <us...@flink.apache.org> <us...@flink.apache.org>
> Subject:Re: [blog article] Howto migrate a real-life batch pipeline from
> the DataSet API to the DataStream API
>
> Thanks for your post, It looks very good to me, also maybe for developers,
>
> Best,
> Liudalong
>
> yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Etienne Chauchot" <ec...@apache.org>
> 收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>
>
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Yun Gao,

FYI I just updated the article after your review: 
https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
>
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>> Hi Etienne,
>>
>> Very thanks for the article! Flink is currently indeed keeping 
>> increasing the
>> ability of unified batch / stream processing with the same api, and 
>> its a great
>> pleasure that more and more users are trying this functionality. But 
>> I also
>> have some questions regarding some details.
>>
>> First IMO, as a whole for the long run Flink will have two unified 
>> APIs, namely Table / SQL
>> API and DataStream API. Users could express the computation logic 
>> with these two APIs
>> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and 
> jiras. And I also think IMHO that reducing the number of APIs to 2 was 
> the good move.
>
>
>> Underlying Flink provides two
>> execution modes:  the streaming mode works with both bounded and 
>> unbounded data,
>> and it executes in a way of incremental processing based on state; 
>> the batch mode works
>> only with bounded data, and it executes in a ways level-by-level 
>> similar to the traditional
>> batch processing frameworks. Users could switch the execution mode via
>> EnvironmentSettings.inBatchMode() for 
>> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I 
> though it would be more efficient on my bounded pipeline but as a 
> matter of fact the streaming mode seems to be more efficient on my use 
> case. I'll test with higher volumes to confirm.
>
>
>>
>> Specially for DataStream, as implemented in FLIP-140, currently all 
>> the existing DataStream
>> operation supports the batch execution mode in a unified way[1]:  
>> data will be sorted for the
>> keyBy() edges according to the key, then the following operations 
>> like reduce() could receive
>> all the data belonging to the same key consecutively, then it could 
>> directly reducing the records
>> of the same key without maintaining the intermediate states. In this 
>> way users could write the
>> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
> pipeline will work with no modification if I plug an unbounded source 
> to it.
>
>>
>>
>> # Regarding the migration of Join / Reduce
>>
>> First I think Reduce is always supported and users could write 
>> dataStream.keyBy().reduce(xx)
>> directly, and  if batch  execution mode is set, the reduce will not 
>> be executed in a incremental way,
>> instead is acts much  like sort-based  aggregation in the traditional 
>> batch processing framework.
>>
>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>> current join has to be bound
>> to a window and the GlobalWindow does not work properly, but with 
>> some more try currently
>> it does not need users to  re-write the whole join from scratch: 
>> Users could write a dedicated
>> window assigner that assigns all the  records to the same window 
>> instance  and return
>> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
>> it works
>>
>> source1.join(source2)
>>                 .where(a -> a.f0)
>>                 .equalTo(b -> b.f0)
>>                 .window(new EndOfStreamWindows())
>>                 .apply(xxxx);
>>
>> It does not requires records have event-time attached since the 
>> trigger of window is only
>> relying on the time range of the window and the assignment does not 
>> need event-time either.
>>
>> The behavior of the join is also similar to sort-based join if batch 
>> mode is enabled.
>>
>> Of course it is not easy to use to let users do the workaround and 
>> we'll try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that 
> I proposed. I tried it and it works perfectly with similar 
> performance. Thanks.
>
>>
>> # Regarding support of Sort / Limit
>>
>> Currently these two operators are indeed not supported in the 
>> DataStream API directly. One initial
>> though for these two operations are that users may convert the 
>> DataStream to Table API and use
>> Table API for these two operators:
>>
>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>> Table tableXX = tableEnv.fromDataStream(dataStream);
>> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have 
> a special SQL based implementation (for comparison reasons) so I did 
> not want to mix SQL and DataStream APIs in the same pipeline.
>
>>
>> How do you think about this option? We are also assessing if the 
>> combination of DataStream
>> API / Table API is sufficient for all the batch users. Any 
>> suggestions are warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of 
> the 3 Flink APIs (broader subject than this article), users can easily 
> mix the APIs in the same pipeline. If we really want to have these 
> operations in the DataStream API maybe wrapping state-based 
> implementations could be good if their performance meets our expectations.
>
>>
>> Best,
>> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] 
>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>
>>
>>
>>     ------------------------------------------------------------------
>>     From:liu ron <ro...@gmail.com>
>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>     pipeline from the DataSet API to the DataStream API
>>
>>     Thanks for your post, It looks very good to me, also maybe for
>>     developers,
>>
>>     Best,
>>     Liudalong
>>
>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>     Wow, cool!  Thanks for your work.
>>     It'll be definitely helpful for the users that want to migrate
>>     their batch job from DataSet API to DataStream API.
>>
>>     Best regards,
>>     Yuxia
>>
>>     ----- 原始邮件 -----
>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>     the DataSet API to the DataStream API
>>
>>     Hi everyone,
>>
>>     In case some of you are interested, I just posted a blog article
>>     about
>>     migrating a real-life batch pipeline from the DataSet API to the
>>     DataStream API:
>>
>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>
>>     Best
>>
>>     Etienne
>>
>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

And by the way, I was planing on writing another article to compare the 
performances of DataSet, DataStream and SQL APIs over TPCDS query3. I 
thought that I could run the pipelines on an Amazon EMR cluster with 
different data sizes 1GB, 100GB, 1TB.

Would it be worth it, what do you think ?

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
>
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>> Hi Etienne,
>>
>> Very thanks for the article! Flink is currently indeed keeping 
>> increasing the
>> ability of unified batch / stream processing with the same api, and 
>> its a great
>> pleasure that more and more users are trying this functionality. But 
>> I also
>> have some questions regarding some details.
>>
>> First IMO, as a whole for the long run Flink will have two unified 
>> APIs, namely Table / SQL
>> API and DataStream API. Users could express the computation logic 
>> with these two APIs
>> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and 
> jiras. And I also think IMHO that reducing the number of APIs to 2 was 
> the good move.
>
>
>> Underlying Flink provides two
>> execution modes:  the streaming mode works with both bounded and 
>> unbounded data,
>> and it executes in a way of incremental processing based on state; 
>> the batch mode works
>> only with bounded data, and it executes in a ways level-by-level 
>> similar to the traditional
>> batch processing frameworks. Users could switch the execution mode via
>> EnvironmentSettings.inBatchMode() for 
>> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I 
> though it would be more efficient on my bounded pipeline but as a 
> matter of fact the streaming mode seems to be more efficient on my use 
> case. I'll test with higher volumes to confirm.
>
>
>>
>> Specially for DataStream, as implemented in FLIP-140, currently all 
>> the existing DataStream
>> operation supports the batch execution mode in a unified way[1]:  
>> data will be sorted for the
>> keyBy() edges according to the key, then the following operations 
>> like reduce() could receive
>> all the data belonging to the same key consecutively, then it could 
>> directly reducing the records
>> of the same key without maintaining the intermediate states. In this 
>> way users could write the
>> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
> pipeline will work with no modification if I plug an unbounded source 
> to it.
>
>>
>>
>> # Regarding the migration of Join / Reduce
>>
>> First I think Reduce is always supported and users could write 
>> dataStream.keyBy().reduce(xx)
>> directly, and  if batch  execution mode is set, the reduce will not 
>> be executed in a incremental way,
>> instead is acts much  like sort-based  aggregation in the traditional 
>> batch processing framework.
>>
>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>> current join has to be bound
>> to a window and the GlobalWindow does not work properly, but with 
>> some more try currently
>> it does not need users to  re-write the whole join from scratch: 
>> Users could write a dedicated
>> window assigner that assigns all the  records to the same window 
>> instance  and return
>> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
>> it works
>>
>> source1.join(source2)
>>                 .where(a -> a.f0)
>>                 .equalTo(b -> b.f0)
>>                 .window(new EndOfStreamWindows())
>>                 .apply(xxxx);
>>
>> It does not requires records have event-time attached since the 
>> trigger of window is only
>> relying on the time range of the window and the assignment does not 
>> need event-time either.
>>
>> The behavior of the join is also similar to sort-based join if batch 
>> mode is enabled.
>>
>> Of course it is not easy to use to let users do the workaround and 
>> we'll try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that 
> I proposed. I tried it and it works perfectly with similar 
> performance. Thanks.
>
>>
>> # Regarding support of Sort / Limit
>>
>> Currently these two operators are indeed not supported in the 
>> DataStream API directly. One initial
>> though for these two operations are that users may convert the 
>> DataStream to Table API and use
>> Table API for these two operators:
>>
>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>> Table tableXX = tableEnv.fromDataStream(dataStream);
>> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have 
> a special SQL based implementation (for comparison reasons) so I did 
> not want to mix SQL and DataStream APIs in the same pipeline.
>
>>
>> How do you think about this option? We are also assessing if the 
>> combination of DataStream
>> API / Table API is sufficient for all the batch users. Any 
>> suggestions are warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of 
> the 3 Flink APIs (broader subject than this article), users can easily 
> mix the APIs in the same pipeline. If we really want to have these 
> operations in the DataStream API maybe wrapping state-based 
> implementations could be good if their performance meets our expectations.
>
>>
>> Best,
>> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] 
>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>
>>
>>
>>     ------------------------------------------------------------------
>>     From:liu ron <ro...@gmail.com>
>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>     pipeline from the DataSet API to the DataStream API
>>
>>     Thanks for your post, It looks very good to me, also maybe for
>>     developers,
>>
>>     Best,
>>     Liudalong
>>
>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>     Wow, cool!  Thanks for your work.
>>     It'll be definitely helpful for the users that want to migrate
>>     their batch job from DataSet API to DataStream API.
>>
>>     Best regards,
>>     Yuxia
>>
>>     ----- 原始邮件 -----
>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>     the DataSet API to the DataStream API
>>
>>     Hi everyone,
>>
>>     In case some of you are interested, I just posted a blog article
>>     about
>>     migrating a real-life batch pipeline from the DataSet API to the
>>     DataStream API:
>>
>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>
>>     Best
>>
>>     Etienne
>>
>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Yun Gao,

FYI I just updated the article after your review: 
https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :
>
> Hi Yun Gao,
>
> thanks for your email and your review !
>
> My comments are inline
>
> Le 08/11/2022 à 06:51, Yun Gao a écrit :
>> Hi Etienne,
>>
>> Very thanks for the article! Flink is currently indeed keeping 
>> increasing the
>> ability of unified batch / stream processing with the same api, and 
>> its a great
>> pleasure that more and more users are trying this functionality. But 
>> I also
>> have some questions regarding some details.
>>
>> First IMO, as a whole for the long run Flink will have two unified 
>> APIs, namely Table / SQL
>> API and DataStream API. Users could express the computation logic 
>> with these two APIs
>> for both bounded and unbounded data processing.
>
>
> Yes that is what I understood also throughout the discussions and 
> jiras. And I also think IMHO that reducing the number of APIs to 2 was 
> the good move.
>
>
>> Underlying Flink provides two
>> execution modes:  the streaming mode works with both bounded and 
>> unbounded data,
>> and it executes in a way of incremental processing based on state; 
>> the batch mode works
>> only with bounded data, and it executes in a ways level-by-level 
>> similar to the traditional
>> batch processing frameworks. Users could switch the execution mode via
>> EnvironmentSettings.inBatchMode() for 
>> StreamExecutionEnvironment.setRuntimeMode().
>
> As recommended in Flink docs(1) I have enabled the batch mode as I 
> though it would be more efficient on my bounded pipeline but as a 
> matter of fact the streaming mode seems to be more efficient on my use 
> case. I'll test with higher volumes to confirm.
>
>
>>
>> Specially for DataStream, as implemented in FLIP-140, currently all 
>> the existing DataStream
>> operation supports the batch execution mode in a unified way[1]:  
>> data will be sorted for the
>> keyBy() edges according to the key, then the following operations 
>> like reduce() could receive
>> all the data belonging to the same key consecutively, then it could 
>> directly reducing the records
>> of the same key without maintaining the intermediate states. In this 
>> way users could write the
>> same code for both streaming and batch processing with the same code.
>
>
> Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
> pipeline will work with no modification if I plug an unbounded source 
> to it.
>
>>
>>
>> # Regarding the migration of Join / Reduce
>>
>> First I think Reduce is always supported and users could write 
>> dataStream.keyBy().reduce(xx)
>> directly, and  if batch  execution mode is set, the reduce will not 
>> be executed in a incremental way,
>> instead is acts much  like sort-based  aggregation in the traditional 
>> batch processing framework.
>>
>> Regarding Join, although the issue of FLINK-22587 indeed exists: 
>> current join has to be bound
>> to a window and the GlobalWindow does not work properly, but with 
>> some more try currently
>> it does not need users to  re-write the whole join from scratch: 
>> Users could write a dedicated
>> window assigner that assigns all the  records to the same window 
>> instance  and return
>> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
>> it works
>>
>> source1.join(source2)
>>                 .where(a -> a.f0)
>>                 .equalTo(b -> b.f0)
>>                 .window(new EndOfStreamWindows())
>>                 .apply(xxxx);
>>
>> It does not requires records have event-time attached since the 
>> trigger of window is only
>> relying on the time range of the window and the assignment does not 
>> need event-time either.
>>
>> The behavior of the join is also similar to sort-based join if batch 
>> mode is enabled.
>>
>> Of course it is not easy to use to let users do the workaround and 
>> we'll try to fix this issue in 1.17.
>
>
> Yes, this is a better workaround than the manual state-based join that 
> I proposed. I tried it and it works perfectly with similar 
> performance. Thanks.
>
>>
>> # Regarding support of Sort / Limit
>>
>> Currently these two operators are indeed not supported in the 
>> DataStream API directly. One initial
>> though for these two operations are that users may convert the 
>> DataStream to Table API and use
>> Table API for these two operators:
>>
>> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
>> Table tableXX = tableEnv.fromDataStream(dataStream);
>> tableXX.orderBy($("a").asc());
>
>
> Yes I knew that workaround but I decided not to use it because I have 
> a special SQL based implementation (for comparison reasons) so I did 
> not want to mix SQL and DataStream APIs in the same pipeline.
>
>>
>> How do you think about this option? We are also assessing if the 
>> combination of DataStream
>> API / Table API is sufficient for all the batch users. Any 
>> suggestions are warmly welcome.
>
>
> I guess that outside of my use case of comparing the performance of 
> the 3 Flink APIs (broader subject than this article), users can easily 
> mix the APIs in the same pipeline. If we really want to have these 
> operations in the DataStream API maybe wrapping state-based 
> implementations could be good if their performance meets our expectations.
>
>>
>> Best,
>> Yun Gao
>
> I'll update the article and the code with your suggestions. Thanks again.
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode
>
>
> Best
>
> Etienne
>
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] 
>> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>>
>>
>>
>>     ------------------------------------------------------------------
>>     From:liu ron <ro...@gmail.com>
>>     Send Time:2022 Nov. 8 (Tue.) 10:21
>>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>>     <ec...@apache.org>; user <us...@flink.apache.org>
>>     Subject:Re: [blog article] Howto migrate a real-life batch
>>     pipeline from the DataSet API to the DataStream API
>>
>>     Thanks for your post, It looks very good to me, also maybe for
>>     developers,
>>
>>     Best,
>>     Liudalong
>>
>>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>>     Wow, cool!  Thanks for your work.
>>     It'll be definitely helpful for the users that want to migrate
>>     their batch job from DataSet API to DataStream API.
>>
>>     Best regards,
>>     Yuxia
>>
>>     ----- 原始邮件 -----
>>     发件人: "Etienne Chauchot" <ec...@apache.org>
>>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>>     主题: [blog article] Howto migrate a real-life batch pipeline from
>>     the DataSet API to the DataStream API
>>
>>     Hi everyone,
>>
>>     In case some of you are interested, I just posted a blog article
>>     about
>>     migrating a real-life batch pipeline from the DataSet API to the
>>     DataStream API:
>>
>>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>>
>>     Best
>>
>>     Etienne
>>
>>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping 
> increasing the
> ability of unified batch / stream processing with the same api, and 
> its a great
> pleasure that more and more users are trying this functionality. But I 
> also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified 
> APIs, namely Table / SQL
> API and DataStream API. Users could express the computation logic with 
> these two APIs
> for both bounded and unbounded data processing.


Yes that is what I understood also throughout the discussions and jiras. 
And I also think IMHO that reducing the number of APIs to 2 was the good 
move.


> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and 
> unbounded data,
> and it executes in a way of incremental processing based on state; the 
> batch mode works
> only with bounded data, and it executes in a ways level-by-level 
> similar to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for 
> StreamExecutionEnvironment.setRuntimeMode().

As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a matter 
of fact the streaming mode seems to be more efficient on my use case. 
I'll test with higher volumes to confirm.


>
> Specially for DataStream, as implemented in FLIP-140, currently all 
> the existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data 
> will be sorted for the
> keyBy() edges according to the key, then the following operations like 
> reduce() could receive
> all the data belonging to the same key consecutively, then it could 
> directly reducing the records
> of the same key without maintaining the intermediate states. In this 
> way users could write the
> same code for both streaming and batch processing with the same code.


Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source to it.

>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write 
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be 
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional 
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: 
> current join has to be bound
> to a window and the GlobalWindow does not work properly, but with some 
> more try currently
> it does not need users to  re-write the whole join from scratch: Users 
> could write a dedicated
> window assigner that assigns all the records to the same window 
> instance  and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
> it works
>
> source1.join(source2)
>                 .where(a -> a.f0)
>                 .equalTo(b -> b.f0)
>                 .window(new EndOfStreamWindows())
>                 .apply(xxxx);
>
> It does not requires records have event-time attached since the 
> trigger of window is only
> relying on the time range of the window and the assignment does not 
> need event-time either.
>
> The behavior of the join is also similar to sort-based join if batch 
> mode is enabled.
>
> Of course it is not easy to use to let users do the workaround and 
> we'll try to fix this issue in 1.17.


Yes, this is a better workaround than the manual state-based join that I 
proposed. I tried it and it works perfectly with similar performance. 
Thanks.

>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the 
> DataStream API directly. One initial
> though for these two operations are that users may convert the 
> DataStream to Table API and use
> Table API for these two operators:
>
> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());


Yes I knew that workaround but I decided not to use it because I have a 
special SQL based implementation (for comparison reasons) so I did not 
want to mix SQL and DataStream APIs in the same pipeline.

>
> How do you think about this option? We are also assessing if the 
> combination of DataStream
> API / Table API is sufficient for all the batch users. Any suggestions 
> are warmly welcome.


I guess that outside of my use case of comparing the performance of the 
3 Flink APIs (broader subject than this article), users can easily mix 
the APIs in the same pipeline. If we really want to have these 
operations in the DataStream API maybe wrapping state-based 
implementations could be good if their performance meets our expectations.

>
> Best,
> Yun Gao

I'll update the article and the code with your suggestions. Thanks again.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode


Best

Etienne

>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2] 
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>
>
>
>     ------------------------------------------------------------------
>     From:liu ron <ro...@gmail.com>
>     Send Time:2022 Nov. 8 (Tue.) 10:21
>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>     <ec...@apache.org>; user <us...@flink.apache.org>
>     Subject:Re: [blog article] Howto migrate a real-life batch
>     pipeline from the DataSet API to the DataStream API
>
>     Thanks for your post, It looks very good to me, also maybe for
>     developers,
>
>     Best,
>     Liudalong
>
>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>     Wow, cool!  Thanks for your work.
>     It'll be definitely helpful for the users that want to migrate
>     their batch job from DataSet API to DataStream API.
>
>     Best regards,
>     Yuxia
>
>     ----- 原始邮件 -----
>     发件人: "Etienne Chauchot" <ec...@apache.org>
>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>     主题: [blog article] Howto migrate a real-life batch pipeline from
>     the DataSet API to the DataStream API
>
>     Hi everyone,
>
>     In case some of you are interested, I just posted a blog article
>     about
>     migrating a real-life batch pipeline from the DataSet API to the
>     DataStream API:
>
>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
>     Best
>
>     Etienne
>
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :
> Hi Etienne,
>
> Very thanks for the article! Flink is currently indeed keeping 
> increasing the
> ability of unified batch / stream processing with the same api, and 
> its a great
> pleasure that more and more users are trying this functionality. But I 
> also
> have some questions regarding some details.
>
> First IMO, as a whole for the long run Flink will have two unified 
> APIs, namely Table / SQL
> API and DataStream API. Users could express the computation logic with 
> these two APIs
> for both bounded and unbounded data processing.


Yes that is what I understood also throughout the discussions and jiras. 
And I also think IMHO that reducing the number of APIs to 2 was the good 
move.


> Underlying Flink provides two
> execution modes:  the streaming mode works with both bounded and 
> unbounded data,
> and it executes in a way of incremental processing based on state; the 
> batch mode works
> only with bounded data, and it executes in a ways level-by-level 
> similar to the traditional
> batch processing frameworks. Users could switch the execution mode via
> EnvironmentSettings.inBatchMode() for 
> StreamExecutionEnvironment.setRuntimeMode().

As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a matter 
of fact the streaming mode seems to be more efficient on my use case. 
I'll test with higher volumes to confirm.


>
> Specially for DataStream, as implemented in FLIP-140, currently all 
> the existing DataStream
> operation supports the batch execution mode in a unified way[1]:  data 
> will be sorted for the
> keyBy() edges according to the key, then the following operations like 
> reduce() could receive
> all the data belonging to the same key consecutively, then it could 
> directly reducing the records
> of the same key without maintaining the intermediate states. In this 
> way users could write the
> same code for both streaming and batch processing with the same code.


Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source to it.

>
>
> # Regarding the migration of Join / Reduce
>
> First I think Reduce is always supported and users could write 
> dataStream.keyBy().reduce(xx)
> directly, and  if batch  execution mode is set, the reduce will not be 
> executed in a incremental way,
> instead is acts much  like sort-based  aggregation in the traditional 
> batch processing framework.
>
> Regarding Join, although the issue of FLINK-22587 indeed exists: 
> current join has to be bound
> to a window and the GlobalWindow does not work properly, but with some 
> more try currently
> it does not need users to  re-write the whole join from scratch: Users 
> could write a dedicated
> window assigner that assigns all the records to the same window 
> instance  and return
> EventTimeTrigger.create() as the default event-time trigger [2]. Then 
> it works
>
> source1.join(source2)
>                 .where(a -> a.f0)
>                 .equalTo(b -> b.f0)
>                 .window(new EndOfStreamWindows())
>                 .apply(xxxx);
>
> It does not requires records have event-time attached since the 
> trigger of window is only
> relying on the time range of the window and the assignment does not 
> need event-time either.
>
> The behavior of the join is also similar to sort-based join if batch 
> mode is enabled.
>
> Of course it is not easy to use to let users do the workaround and 
> we'll try to fix this issue in 1.17.


Yes, this is a better workaround than the manual state-based join that I 
proposed. I tried it and it works perfectly with similar performance. 
Thanks.

>
> # Regarding support of Sort / Limit
>
> Currently these two operators are indeed not supported in the 
> DataStream API directly. One initial
> though for these two operations are that users may convert the 
> DataStream to Table API and use
> Table API for these two operators:
>
> DataStream<xx> xx = ... // Keeps the customized logic in DataStream
> Table tableXX = tableEnv.fromDataStream(dataStream);
> tableXX.orderBy($("a").asc());


Yes I knew that workaround but I decided not to use it because I have a 
special SQL based implementation (for comparison reasons) so I did not 
want to mix SQL and DataStream APIs in the same pipeline.

>
> How do you think about this option? We are also assessing if the 
> combination of DataStream
> API / Table API is sufficient for all the batch users. Any suggestions 
> are warmly welcome.


I guess that outside of my use case of comparing the performance of the 
3 Flink APIs (broader subject than this article), users can easily mix 
the APIs in the same pipeline. If we really want to have these 
operations in the DataStream API maybe wrapping state-based 
implementations could be good if their performance meets our expectations.

>
> Best,
> Yun Gao

I'll update the article and the code with your suggestions. Thanks again.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/#when-canshould-i-use-batch-execution-mode


Best

Etienne

>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2] 
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
>
>
>
>     ------------------------------------------------------------------
>     From:liu ron <ro...@gmail.com>
>     Send Time:2022 Nov. 8 (Tue.) 10:21
>     To:dev <de...@flink.apache.org>; Etienne Chauchot
>     <ec...@apache.org>; user <us...@flink.apache.org>
>     Subject:Re: [blog article] Howto migrate a real-life batch
>     pipeline from the DataSet API to the DataStream API
>
>     Thanks for your post, It looks very good to me, also maybe for
>     developers,
>
>     Best,
>     Liudalong
>
>     yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:
>     Wow, cool!  Thanks for your work.
>     It'll be definitely helpful for the users that want to migrate
>     their batch job from DataSet API to DataStream API.
>
>     Best regards,
>     Yuxia
>
>     ----- 原始邮件 -----
>     发件人: "Etienne Chauchot" <ec...@apache.org>
>     收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
>     发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
>     主题: [blog article] Howto migrate a real-life batch pipeline from
>     the DataSet API to the DataStream API
>
>     Hi everyone,
>
>     In case some of you are interested, I just posted a blog article
>     about
>     migrating a real-life batch pipeline from the DataSet API to the
>     DataStream API:
>
>     https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
>     Best
>
>     Etienne
>
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Etienne,
Very thanks for the article! Flink is currently indeed keeping increasing the
ability of unified batch / stream processing with the same api, and its a great
pleasure that more and more users are trying this functionality. But I also
have some questions regarding some details. 
First IMO, as a whole for the long run Flink will have two unified APIs, namely Table / SQL
 API and DataStream API. Users could express the computation logic with these two APIs
for both bounded and unbounded data processing. Underlying Flink provides two
execution modes: the streaming mode works with both bounded and unbounded data, 
and it executes in a way of incremental processing based on state; the batch mode works
only with bounded data, and it executes in a ways level-by-level similar to the traditional 
batch processing frameworks. Users could switch the execution mode via 
EnvironmentSettings.inBatchMode() for StreamExecutionEnvironment.setRuntimeMode(). 
Specially for DataStream, as implemented in FLIP-140, currently all the existing DataStream 
operation supports the batch execution mode in a unified way[1]: data will be sorted for the 
keyBy() edges according to the key, then the following operations like reduce() could receive 
all the data belonging to the same key consecutively, then it could directly reducing the records
 of the same key without maintaining the intermediate states. In this way users could write the 
same code for both streaming and batch processing with the same code. 
# Regarding the migration of Join / Reduce
First I think Reduce is always supported and users could write dataStream.keyBy().reduce(xx) 
directly, and if batch execution mode is set, the reduce will not be executed in a incremental way, 
instead is acts much like sort-based aggregation in the traditional batch processing framework.
Regarding Join, although the issue of FLINK-22587 indeed exists: current join has to be bound 
to a window and the GlobalWindow does not work properly, but with some more try currently 
it does not need users to re-write the whole join from scratch: Users could write a dedicated 
window assigner that assigns all the records to the same window instance and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then it works
source1.join(source2)
 .where(a -> a.f0)
 .equalTo(b -> b.f0)
 .window(new EndOfStreamWindows())
 .apply(xxxx);
It does not requires records have event-time attached since the trigger of window is only 
relying on the time range of the window and the assignment does not need event-time either. 
The behavior of the join is also similar to sort-based join if batch mode is enabled. 
Of course it is not easy to use to let users do the workaround and we'll try to fix this issue in 1.17. 
# Regarding support of Sort / Limit
Currently these two operators are indeed not supported in the DataStream API directly. One initial 
though for these two operations are that users may convert the DataStream to Table API and use 
Table API for these two operators:
DataStream<xx> xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());
How do you think about this option? We are also assessing if the combination of DataStream
 API / Table API is sufficient for all the batch users. Any suggestions are warmly welcome. 
Best,
Yun Gao
[1]  <https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ >
[2] https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java <https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java >
------------------------------------------------------------------
From:liu ron <ro...@gmail.com>
Send Time:2022 Nov. 8 (Tue.) 10:21
To:dev <de...@flink.apache.org>; Etienne Chauchot <ec...@apache.org>; user <us...@flink.apache.org>
Subject:Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API
Thanks for your post, It looks very good to me, also maybe for developers,
Best,
Liudalong
yuxia <luoyuxia@alumni.sjtu.edu.cn <mailto:luoyuxia@alumni.sjtu.edu.cn >> 于2022年11月8日周二 09:11写道:
Wow, cool! Thanks for your work.
 It'll be definitely helpful for the users that want to migrate their batch job from DataSet API to DataStream API.
 Best regards,
 Yuxia
 ----- 原始邮件 -----
 发件人: "Etienne Chauchot" <echauchot@apache.org <mailto:echauchot@apache.org >>
 收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org >>, "User" <user@flink.apache.org <mailto:user@flink.apache.org >>
 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
 主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API
 Hi everyone,
 In case some of you are interested, I just posted a blog article about 
 migrating a real-life batch pipeline from the DataSet API to the 
 DataStream API:
https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html <https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html >
 Best
 Etienne

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by Yun Gao <yu...@aliyun.com>.
Hi Etienne,
Very thanks for the article! Flink is currently indeed keeping increasing the
ability of unified batch / stream processing with the same api, and its a great
pleasure that more and more users are trying this functionality. But I also
have some questions regarding some details. 
First IMO, as a whole for the long run Flink will have two unified APIs, namely Table / SQL
 API and DataStream API. Users could express the computation logic with these two APIs
for both bounded and unbounded data processing. Underlying Flink provides two
execution modes: the streaming mode works with both bounded and unbounded data, 
and it executes in a way of incremental processing based on state; the batch mode works
only with bounded data, and it executes in a ways level-by-level similar to the traditional 
batch processing frameworks. Users could switch the execution mode via 
EnvironmentSettings.inBatchMode() for StreamExecutionEnvironment.setRuntimeMode(). 
Specially for DataStream, as implemented in FLIP-140, currently all the existing DataStream 
operation supports the batch execution mode in a unified way[1]: data will be sorted for the 
keyBy() edges according to the key, then the following operations like reduce() could receive 
all the data belonging to the same key consecutively, then it could directly reducing the records
 of the same key without maintaining the intermediate states. In this way users could write the 
same code for both streaming and batch processing with the same code. 
# Regarding the migration of Join / Reduce
First I think Reduce is always supported and users could write dataStream.keyBy().reduce(xx) 
directly, and if batch execution mode is set, the reduce will not be executed in a incremental way, 
instead is acts much like sort-based aggregation in the traditional batch processing framework.
Regarding Join, although the issue of FLINK-22587 indeed exists: current join has to be bound 
to a window and the GlobalWindow does not work properly, but with some more try currently 
it does not need users to re-write the whole join from scratch: Users could write a dedicated 
window assigner that assigns all the records to the same window instance and return
EventTimeTrigger.create() as the default event-time trigger [2]. Then it works
source1.join(source2)
 .where(a -> a.f0)
 .equalTo(b -> b.f0)
 .window(new EndOfStreamWindows())
 .apply(xxxx);
It does not requires records have event-time attached since the trigger of window is only 
relying on the time range of the window and the assignment does not need event-time either. 
The behavior of the join is also similar to sort-based join if batch mode is enabled. 
Of course it is not easy to use to let users do the workaround and we'll try to fix this issue in 1.17. 
# Regarding support of Sort / Limit
Currently these two operators are indeed not supported in the DataStream API directly. One initial 
though for these two operations are that users may convert the DataStream to Table API and use 
Table API for these two operators:
DataStream<xx> xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());
How do you think about this option? We are also assessing if the combination of DataStream
 API / Table API is sufficient for all the batch users. Any suggestions are warmly welcome. 
Best,
Yun Gao
[1]  <https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ >
[2] https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java <https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java >
------------------------------------------------------------------
From:liu ron <ro...@gmail.com>
Send Time:2022 Nov. 8 (Tue.) 10:21
To:dev <de...@flink.apache.org>; Etienne Chauchot <ec...@apache.org>; user <us...@flink.apache.org>
Subject:Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API
Thanks for your post, It looks very good to me, also maybe for developers,
Best,
Liudalong
yuxia <luoyuxia@alumni.sjtu.edu.cn <mailto:luoyuxia@alumni.sjtu.edu.cn >> 于2022年11月8日周二 09:11写道:
Wow, cool! Thanks for your work.
 It'll be definitely helpful for the users that want to migrate their batch job from DataSet API to DataStream API.
 Best regards,
 Yuxia
 ----- 原始邮件 -----
 发件人: "Etienne Chauchot" <echauchot@apache.org <mailto:echauchot@apache.org >>
 收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org >>, "User" <user@flink.apache.org <mailto:user@flink.apache.org >>
 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
 主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API
 Hi everyone,
 In case some of you are interested, I just posted a blog article about 
 migrating a real-life batch pipeline from the DataSet API to the 
 DataStream API:
https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html <https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html >
 Best
 Etienne

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by liu ron <ro...@gmail.com>.
Thanks for your post, It looks very good to me, also maybe for developers,

Best,
Liudalong

yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:

> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Etienne Chauchot" <ec...@apache.org>
> 收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by liu ron <ro...@gmail.com>.
Thanks for your post, It looks very good to me, also maybe for developers,

Best,
Liudalong

yuxia <lu...@alumni.sjtu.edu.cn> 于2022年11月8日周二 09:11写道:

> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Etienne Chauchot" <ec...@apache.org>
> 收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Wow, cool!  Thanks for your work.
It'll be definitely helpful for the users that want to migrate their batch job from DataSet API to DataStream API.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Etienne Chauchot" <ec...@apache.org>
收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:

https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Wow, cool!  Thanks for your work.
It'll be definitely helpful for the users that want to migrate their batch job from DataSet API to DataStream API.

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Etienne Chauchot" <ec...@apache.org>
收件人: "dev" <de...@flink.apache.org>, "User" <us...@flink.apache.org>
发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:

https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne