You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Groesbeck <pe...@gmail.com> on 2020/05/04 23:40:20 UTC

Writing _SUCCESS Files (Streaming and Batch)

I am replacing an M/R job with a Streaming job using the StreamingFileSink
and there is a requirement to generate an empty _SUCCESS file like the old
Hadoop job. I have to implement a similar Batch job to read from backup
files in case of outages or downtime.

The Batch job question was answered here and appears to be still relevant
although if someone could confirm for me that would be great.
https://stackoverflow.com/a/39413810

The question of the Streaming job came up back in 2018 here:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E

But the solution to use or extend the BucketingSink class seems out of date
now that BucketingSink has been deprecated.

Is there a way to implement a similar solution for StreamingFileSink?

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future.

Thank you,
Peter

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Guowei Ma <gu...@gmail.com>.
Hi Theo


Sorry for the late reply and thanks for your detailed explanation.


From your description, I know

   1. The Impala only handle the parquet file that it has been
   notified(scanned).
   2. You could accept that only updating the state once per partition in
   your scenario.
   3. There are limited connections for Impala in your production
   environment.

So I think it could meet your need that the “listener” only notifies the
partition closed/finished if the 2 is correct.  WDYT?

Best,
Guowei


Theo Diefenthal <th...@scoop-software.de> 于2020年5月13日周三 下午5:16写道:

> Hi Guowei,
>
> Impala is a database that can execute fast SQL Queries on parquet data. It
> has its own small metadata store for the parquet-tables created in there.
> In that store, it remembers the .parquet files in each partition and also
> stores some statistics like number of rows and so on.
>
> If I have a new .parquet file in my partition (or a new partition), I need
> to tell Impala about it. Otherwise, Impala won't take those files into
> account for its queries. So I execute a query to impala like
>
> ALTER TABLE MYDATA ADD IF NOT EXISTS PARTITION (partitionkey=\"20200513T100000")
>
> Impala will add this partition and/or scan it for new .parquet-files and
> update its metastore. This can be run more generically like
>
> ALTER TABLE MYDATA RECOVER PARTITIONS
> and/or
> REFRESH MYDATA
>
> But those queries take more time to execute. Furthermore, I want to update
> the table statistics as well (Prior stuff just told impala about new
> .parquet files). I run a query like this
>
> COMPUTE INCREMENTAL STATS MYDATA PARTITION (partitionkey=\"20200513T100000")
>
> This query can run for a rather long time, speaking about a few minutes
> for reasonable tables of few terabytes in size. I can leave the
> partitionkey stuff away and perform the query over the entire table, but
> then, it will take even more time to be computed. (Note that I think of
> optimizing my solution a bit and issue the Incremental Stats statement with
> the watermark in a later version. The stats are not required for an impala
> table, it just helps its planner for table with joins on how to build the
> execution plan. So I can wait with updating the stats up until the
> watermark passes and perform this query really only once)
>
> Furthermore, in our prod enviornment, we are not allowed to have too many
> simultaneous connections to Impala. The duration of the SQL statements and
> the requirement that we don't have too many connections led us to have
> those impala updates in a parallelism 1 task. Usually, all our task
> managers write data to the same partition (Streaming events from "now" and
> partitioned tables on an hourly basis). So there is no need that each
> taskmanager tells impala to update the very same partition multiple times.
> In my parallelism 1 task, I see that they all worked on the same partition
> and submit one query to impala to update this partition.
>
> Having a notifier sounds great, if it can be executed with parallelism 1
> for all sink tasks..
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Guowei Ma" <gu...@gmail.com>
> *An: *"Theo Diefenthal" <th...@scoop-software.de>
> *CC: *"user" <us...@flink.apache.org>, "yungao gy" <yu...@aliyun.com>
> *Gesendet: *Mittwoch, 13. Mai 2020 09:15:37
> *Betreff: *Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi, Theo
> Thank you for sharing your solution.
> From your description, it seems that what you need is a listener that
> could notify the state change of the partition/bucket:
> created/updated/closed. (maybe you don't need the close notify).
> I am not familiar with Impala. So what I want to know is why you need to
> be notified when the partition got new data every time. Would you like to
> give some detailed descriptions?
>
> Best,
> Guowei
>
>
> Theo Diefenthal <th...@scoop-software.de> 于2020年5月13日周三
> 上午12:00写道:
>
>> Hi Yun,
>>
>> For me, that sounds quite nice. I implemented the same for my application
>> a few weeks ago, but of course tailored only to my app.
>> What I did:
>> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
>> 2. I extended the default ProcessOperator and instead of
>> "notifyCheckpointComplete(long checkpointId)", I provided my
>> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
>> lastCommitWatermark)".
>> 3. I added a custom sink with parallelism 1 behind the
>> WrappedProcessFunction.
>> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
>> message downstream to the parallelism 1 sink containing data about which
>> partitions were written to between in the phase to the last checkpoint.
>> 5. In the parallelism 1 sink, I make sure that I get the messages from
>> all upstream task (Give the constructor an int parameter telling it the
>> parallelism of the WrappedProcessFunction) and then perform my parallelism
>> 1 operation, in my case, telling Impala which partitions were added or got
>> new data. Luckily, in case of Impala, that operation can be made idempotent
>> so I only needed to make sure that I have an at least once processing from
>> the state perspective here.
>>
>> I had to go for notifyCheckpointComplete as only there, the parquet files
>> are ultimately committed and thus available for spark, impala and so on.
>>
>> So if you go on with that issue, I'd be really happy to be able to
>> customize the solution and e.g. get rid of my custom setup by only
>> specifiying kind of a lambda function which should be run with parallelism
>> 1 and update impala. That function would however still need the info which
>> partitions were updated/added.
>> And in my case, I was not really interested in the watermark (I sent it
>> downstream only for metric purposes) but want to tell impala after each
>> commit which partitions changed, regardless of the value from the watermark.
>>
>> Best regards
>> Theo
>>
>> ------------------------------
>> *Von: *"Yun Gao" <yu...@aliyun.com>
>> *An: *"Robert Metzger" <rm...@apache.org>, "Jingsong Li" <
>> jingsonglee0@gmail.com>
>> *CC: *"Peter Groesbeck" <pe...@gmail.com>, "user" <
>> user@flink.apache.org>
>> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
>> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>>
>> Hi Peter,
>>
>>     Sorry for missing the question and response later, I'm currently
>> sworking together with Jingsong on the issue to support "global committing"
>> (like writing _SUCCESS file or adding partitions to hive store) after
>> buckets terminated. In 1.11 we may first support watermark/time related
>> buckets in Table/SQL API, and we are also thinking of supporting "global
>> committing" for arbitrary bucket assigner policy for StreamingFileSink
>> users. The current rough thought is to let users specify when a bucket is
>> terminated on a single task, and the OperatorCoordinator[1] of the sink
>> will aggreate the information from all subtasks about this bucket and do
>> the global committing if the bucket has been finished on all the subtasks,
>> but this is still under thinking and discussion. Any thoughts or
>> requirements on this issue are warmly welcome.
>>
>> Best,
>>  Yun
>>
>>
>> [1] OperatorCoordinator is introduced in FLIP-27:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
>> This is a component resides in JobManager and could communicate with all
>> the subtasks of the corresponding operator, thus it could be used to
>> aggregate status from subtasks.
>>
>> ------------------原始邮件 ------------------
>> *发件人:*Robert Metzger <rm...@apache.org>
>> *发送时间:*Tue May 12 15:36:26 2020
>> *收件人:*Jingsong Li <ji...@gmail.com>
>> *抄送:*Peter Groesbeck <pe...@gmail.com>, user <
>> user@flink.apache.org>
>> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>>
>>> Hi Peter,
>>> I filed a ticket for this feature request:
>>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add
>>> your thoughts / requirements to the ticket)
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> On Wed, May 6, 2020 at 3:41 AM Jingsong Li <ji...@gmail.com>
>>> wrote:
>>>
>>>> Hi Peter,
>>>> The troublesome is how to know the "ending" for a bucket in streaming
>>>> job.
>>>> In 1.11, we are trying to implement a watermark-related bucket ending
>>>> mechanism[1] in Table/SQL.
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <
>>>> peter.groesbeck@gmail.com> wrote:
>>>>
>>>>> I am replacing an M/R job with a Streaming job using the
>>>>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>>>>> file like the old Hadoop job. I have to implement a similar Batch job to
>>>>> read from backup files in case of outages or downtime.
>>>>>
>>>>> The Batch job question was answered here and appears to be still
>>>>> relevant although if someone could confirm for me that would be great.
>>>>> https://stackoverflow.com/a/39413810
>>>>>
>>>>> The question of the Streaming job came up back in 2018 here:
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E
>>>>>
>>>>> But the solution to use or extend the BucketingSink class seems out of
>>>>> date now that BucketingSink has been deprecated.
>>>>>
>>>>> Is there a way to implement a similar solution for StreamingFileSink?
>>>>>
>>>>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>>>>> future.
>>>>>
>>>>> Thank you,
>>>>> Peter
>>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Guowei, 

Impala is a database that can execute fast SQL Queries on parquet data. It has its own small metadata store for the parquet-tables created in there. In that store, it remembers the .parquet files in each partition and also stores some statistics like number of rows and so on. 

If I have a new .parquet file in my partition (or a new partition), I need to tell Impala about it. Otherwise, Impala won't take those files into account for its queries. So I execute a query to impala like 
ALTER TABLE MYDATA ADD IF NOT EXISTS PARTITION (partitionkey= \" 20200513T100000 ") 
Impala will add this partition and/or scan it for new .parquet-files and update its metastore. This can be run more generically like 
ALTER TABLE MYDATA RECOVER PARTITIONS 
and/or 
REFRESH MYDATA 
But those queries take more time to execute. Furthermore, I want to update the table statistics as well (Prior stuff just told impala about new .parquet files). I run a query like this 
COMPUTE INCREMENTAL STATS MYDATA PARTITION (partitionkey= \" 20200513T100000 ") 
This query can run for a rather long time, speaking about a few minutes for reasonable tables of few terabytes in size. I can leave the partitionkey stuff away and perform the query over the entire table, but then, it will take even more time to be computed. (Note that I think of optimizing my solution a bit and issue the Incremental Stats statement with the watermark in a later version. The stats are not required for an impala table, it just helps its planner for table with joins on how to build the execution plan. So I can wait with updating the stats up until the watermark passes and perform this query really only once) 

Furthermore, in our prod enviornment, we are not allowed to have too many simultaneous connections to Impala. The duration of the SQL statements and the requirement that we don't have too many connections led us to have those impala updates in a parallelism 1 task. Usually, all our task managers write data to the same partition (Streaming events from "now" and partitioned tables on an hourly basis). So there is no need that each taskmanager tells impala to update the very same partition multiple times. In my parallelism 1 task, I see that they all worked on the same partition and submit one query to impala to update this partition. 

Having a notifier sounds great, if it can be executed with parallelism 1 for all sink tasks.. 

Best regards 
Theo 


Von: "Guowei Ma" <gu...@gmail.com> 
An: "Theo Diefenthal" <th...@scoop-software.de> 
CC: "user" <us...@flink.apache.org>, "yungao gy" <yu...@aliyun.com> 
Gesendet: Mittwoch, 13. Mai 2020 09:15:37 
Betreff: Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi, Theo 
Thank you for sharing your solution. 
From your description, it seems that what you need is a listener that could notify the state change of the partition/bucket: created/updated/closed. (maybe you don't need the close notify). 
I am not familiar with Impala. So what I want to know is why you need to be notified when the partition got new data every time. Would you like to give some detailed descriptions? 

Best, 
Guowei 


Theo Diefenthal < [ mailto:theo.diefenthal@scoop-software.de | theo.diefenthal@scoop-software.de ] > 于2020年5月13日周三 上午12:00写道: 



Hi Yun, 

For me, that sounds quite nice. I implemented the same for my application a few weeks ago, but of course tailored only to my app. 
What I did: 
1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 
2. I extended the default ProcessOperator and instead of "notifyCheckpointComplete(long checkpointId)", I provided my WrappedProcessFunction a "notifyCheckpointComplete(checkointId, lastCommitWatermark)". 
3. I added a custom sink with parallelism 1 behind the WrappedProcessFunction. 
4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a message downstream to the parallelism 1 sink containing data about which partitions were written to between in the phase to the last checkpoint. 
5. In the parallelism 1 sink, I make sure that I get the messages from all upstream task (Give the constructor an int parameter telling it the parallelism of the WrappedProcessFunction) and then perform my parallelism 1 operation, in my case, telling Impala which partitions were added or got new data. Luckily, in case of Impala, that operation can be made idempotent so I only needed to make sure that I have an at least once processing from the state perspective here. 

I had to go for notifyCheckpointComplete as only there, the parquet files are ultimately committed and thus available for spark, impala and so on. 

So if you go on with that issue, I'd be really happy to be able to customize the solution and e.g. get rid of my custom setup by only specifiying kind of a lambda function which should be run with parallelism 1 and update impala. That function would however still need the info which partitions were updated/added. 
And in my case, I was not really interested in the watermark (I sent it downstream only for metric purposes) but want to tell impala after each commit which partitions changed, regardless of the value from the watermark. 

Best regards 
Theo 


Von: "Yun Gao" < [ mailto:yungao.gy@aliyun.com | yungao.gy@aliyun.com ] > 
An: "Robert Metzger" < [ mailto:rmetzger@apache.org | rmetzger@apache.org ] >, "Jingsong Li" < [ mailto:jingsonglee0@gmail.com | jingsonglee0@gmail.com ] > 
CC: "Peter Groesbeck" < [ mailto:peter.groesbeck@gmail.com | peter.groesbeck@gmail.com ] >, "user" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Gesendet: Dienstag, 12. Mai 2020 10:36:59 
Betreff: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi Peter, 

Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time related buckets in Table/SQL API, and we are also thinking of supporting "global committing" for arbitrary bucket assigner policy for StreamingFileSink users. The current rough thought is to let users specify when a bucket is terminated on a single task, and the OperatorCoordinator[1] of the sink will aggreate the information from all subtasks about this bucket and do the global committing if the bucket has been finished on all the subtasks, but this is still under thinking and discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best, 
Yun 


[1] OperatorCoordinator is introduced in FLIP-27: [ https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface | https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface ] . This is a component resides in JobManager and could communicate with all the subtasks of the corresponding operator, thus it could be used to aggregate status from subtasks. 


BQ_BEGIN

------------------原始邮件 ------------------ 
发件人: Robert Metzger < [ mailto:rmetzger@apache.org | rmetzger@apache.org ] > 
发送时间: Tue May 12 15:36:26 2020 
收件人: Jingsong Li < [ mailto:jingsonglee0@gmail.com | jingsonglee0@gmail.com ] > 
抄送: Peter Groesbeck < [ mailto:peter.groesbeck@gmail.com | peter.groesbeck@gmail.com ] >, user < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
主题: Re: Writing _SUCCESS Files (Streaming and Batch) 

BQ_BEGIN

Hi Peter, 
I filed a ticket for this feature request: [ https://issues.apache.org/jira/browse/FLINK-17627 | https://issues.apache.org/jira/browse/FLINK-17627 ] (feel free to add your thoughts / requirements to the ticket) 

Best, 
Robert 


On Wed, May 6, 2020 at 3:41 AM Jingsong Li < [ mailto:jingsonglee0@gmail.com | jingsonglee0@gmail.com ] > wrote: 

BQ_BEGIN

Hi Peter, 
The troublesome is how to know the "ending" for a bucket in streaming job. 
In 1.11, we are trying to implement a watermark-related bucket ending mechanism[1] in Table/SQL. 

[1] [ https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table | https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table ] 

Best, 
Jingsong Lee 

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck < [ mailto:peter.groesbeck@gmail.com | peter.groesbeck@gmail.com ] > wrote: 

BQ_BEGIN

I am replacing an M/R job with a Streaming job using the StreamingFileSink and there is a requirement to generate an empty _SUCCESS file like the old Hadoop job. I have to implement a similar Batch job to read from backup files in case of outages or downtime. 

The Batch job question was answered here and appears to be still relevant although if someone could confirm for me that would be great. 
[ https://stackoverflow.com/a/39413810 | https://stackoverflow.com/a/39413810 ] 

The question of the Streaming job came up back in 2018 here: 
[ http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E | http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E ] 

But the solution to use or extend the BucketingSink class seems out of date now that BucketingSink has been deprecated. 

Is there a way to implement a similar solution for StreamingFileSink? 

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future. 

Thank you, 
Peter 





-- 
Best, Jingsong Lee 

BQ_END


BQ_END


BQ_END


BQ_END


Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Theo
Thank you for sharing your solution.
From your description, it seems that what you need is a listener that could
notify the state change of the partition/bucket: created/updated/closed.
(maybe you don't need the close notify).
I am not familiar with Impala. So what I want to know is why you need to be
notified when the partition got new data every time. Would you like to give
some detailed descriptions?

Best,
Guowei


Theo Diefenthal <th...@scoop-software.de> 于2020年5月13日周三 上午12:00写道:

> Hi Yun,
>
> For me, that sounds quite nice. I implemented the same for my application
> a few weeks ago, but of course tailored only to my app.
> What I did:
> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
> 2. I extended the default ProcessOperator and instead of
> "notifyCheckpointComplete(long checkpointId)", I provided my
> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
> lastCommitWatermark)".
> 3. I added a custom sink with parallelism 1 behind the
> WrappedProcessFunction.
> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
> message downstream to the parallelism 1 sink containing data about which
> partitions were written to between in the phase to the last checkpoint.
> 5. In the parallelism 1 sink, I make sure that I get the messages from all
> upstream task (Give the constructor an int parameter telling it the
> parallelism of the WrappedProcessFunction) and then perform my parallelism
> 1 operation, in my case, telling Impala which partitions were added or got
> new data. Luckily, in case of Impala, that operation can be made idempotent
> so I only needed to make sure that I have an at least once processing from
> the state perspective here.
>
> I had to go for notifyCheckpointComplete as only there, the parquet files
> are ultimately committed and thus available for spark, impala and so on.
>
> So if you go on with that issue, I'd be really happy to be able to
> customize the solution and e.g. get rid of my custom setup by only
> specifiying kind of a lambda function which should be run with parallelism
> 1 and update impala. That function would however still need the info which
> partitions were updated/added.
> And in my case, I was not really interested in the watermark (I sent it
> downstream only for metric purposes) but want to tell impala after each
> commit which partitions changed, regardless of the value from the watermark.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Yun Gao" <yu...@aliyun.com>
> *An: *"Robert Metzger" <rm...@apache.org>, "Jingsong Li" <
> jingsonglee0@gmail.com>
> *CC: *"Peter Groesbeck" <pe...@gmail.com>, "user" <
> user@flink.apache.org>
> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi Peter,
>
>     Sorry for missing the question and response later, I'm currently
> sworking together with Jingsong on the issue to support "global committing"
> (like writing _SUCCESS file or adding partitions to hive store) after
> buckets terminated. In 1.11 we may first support watermark/time related
> buckets in Table/SQL API, and we are also thinking of supporting "global
> committing" for arbitrary bucket assigner policy for StreamingFileSink
> users. The current rough thought is to let users specify when a bucket is
> terminated on a single task, and the OperatorCoordinator[1] of the sink
> will aggreate the information from all subtasks about this bucket and do
> the global committing if the bucket has been finished on all the subtasks,
> but this is still under thinking and discussion. Any thoughts or
> requirements on this issue are warmly welcome.
>
> Best,
>  Yun
>
>
> [1] OperatorCoordinator is introduced in FLIP-27:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
> This is a component resides in JobManager and could communicate with all
> the subtasks of the corresponding operator, thus it could be used to
> aggregate status from subtasks.
>
> ------------------原始邮件 ------------------
> *发件人:*Robert Metzger <rm...@apache.org>
> *发送时间:*Tue May 12 15:36:26 2020
> *收件人:*Jingsong Li <ji...@gmail.com>
> *抄送:*Peter Groesbeck <pe...@gmail.com>, user <
> user@flink.apache.org>
> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>
>> Hi Peter,
>> I filed a ticket for this feature request:
>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
>> thoughts / requirements to the ticket)
>>
>> Best,
>> Robert
>>
>>
>> On Wed, May 6, 2020 at 3:41 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Hi Peter,
>>> The troublesome is how to know the "ending" for a bucket in streaming
>>> job.
>>> In 1.11, we are trying to implement a watermark-related bucket ending
>>> mechanism[1] in Table/SQL.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <
>>> peter.groesbeck@gmail.com> wrote:
>>>
>>>> I am replacing an M/R job with a Streaming job using the
>>>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>>>> file like the old Hadoop job. I have to implement a similar Batch job to
>>>> read from backup files in case of outages or downtime.
>>>>
>>>> The Batch job question was answered here and appears to be still
>>>> relevant although if someone could confirm for me that would be great.
>>>> https://stackoverflow.com/a/39413810
>>>>
>>>> The question of the Streaming job came up back in 2018 here:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E
>>>>
>>>> But the solution to use or extend the BucketingSink class seems out of
>>>> date now that BucketingSink has been deprecated.
>>>>
>>>> Is there a way to implement a similar solution for StreamingFileSink?
>>>>
>>>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>>>> future.
>>>>
>>>> Thank you,
>>>> Peter
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Peter Groesbeck <pe...@gmail.com>.
Robert, Yun, and Theo,

Thanks for the responses! I'm very much looking forward to upgrading once
those changes are made.

I hacked this in myself but likely in a much less elegant way than Theo.
For anybody who is curious - I extended the DateTimeBucketAssigner class
and overrode the getBucketId method so that I could use the AWS SDK to
directly write an empty success file whenever a new bucket was opened. I
keep a Map of the last bucket that I've already written to, as well as the
current bucket, so that I don't overwhelm S3 with requests (getBucketId
appears to be called quite frequently).

It's far from ideal but it's performing quite nicely and generally I'm not
seeing any significant race conditions between files closing in the
previous bucket and opening in the next. I of course only write the
_SUCCESS file to the previous bucket not the current one.

I will keep an eye on that ticket, and very much appreciate the effort!

Warm regards,
Peter

On Tue, May 12, 2020 at 11:00 AM Theo Diefenthal <
theo.diefenthal@scoop-software.de> wrote:

> Hi Yun,
>
> For me, that sounds quite nice. I implemented the same for my application
> a few weeks ago, but of course tailored only to my app.
> What I did:
> 1. I wrapped the Parquet-StreamingFileSink into a Process-Function.
> 2. I extended the default ProcessOperator and instead of
> "notifyCheckpointComplete(long checkpointId)", I provided my
> WrappedProcessFunction a "notifyCheckpointComplete(checkointId,
> lastCommitWatermark)".
> 3. I added a custom sink with parallelism 1 behind the
> WrappedProcessFunction.
> 4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a
> message downstream to the parallelism 1 sink containing data about which
> partitions were written to between in the phase to the last checkpoint.
> 5. In the parallelism 1 sink, I make sure that I get the messages from all
> upstream task (Give the constructor an int parameter telling it the
> parallelism of the WrappedProcessFunction) and then perform my parallelism
> 1 operation, in my case, telling Impala which partitions were added or got
> new data. Luckily, in case of Impala, that operation can be made idempotent
> so I only needed to make sure that I have an at least once processing from
> the state perspective here.
>
> I had to go for notifyCheckpointComplete as only there, the parquet files
> are ultimately committed and thus available for spark, impala and so on.
>
> So if you go on with that issue, I'd be really happy to be able to
> customize the solution and e.g. get rid of my custom setup by only
> specifiying kind of a lambda function which should be run with parallelism
> 1 and update impala. That function would however still need the info which
> partitions were updated/added.
> And in my case, I was not really interested in the watermark (I sent it
> downstream only for metric purposes) but want to tell impala after each
> commit which partitions changed, regardless of the value from the watermark.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Yun Gao" <yu...@aliyun.com>
> *An: *"Robert Metzger" <rm...@apache.org>, "Jingsong Li" <
> jingsonglee0@gmail.com>
> *CC: *"Peter Groesbeck" <pe...@gmail.com>, "user" <
> user@flink.apache.org>
> *Gesendet: *Dienstag, 12. Mai 2020 10:36:59
> *Betreff: *回复:Re: Writing _SUCCESS Files (Streaming and Batch)
>
> Hi Peter,
>
>     Sorry for missing the question and response later, I'm currently
> sworking together with Jingsong on the issue to support "global committing"
> (like writing _SUCCESS file or adding partitions to hive store) after
> buckets terminated. In 1.11 we may first support watermark/time related
> buckets in Table/SQL API, and we are also thinking of supporting "global
> committing" for arbitrary bucket assigner policy for StreamingFileSink
> users. The current rough thought is to let users specify when a bucket is
> terminated on a single task, and the OperatorCoordinator[1] of the sink
> will aggreate the information from all subtasks about this bucket and do
> the global committing if the bucket has been finished on all the subtasks,
> but this is still under thinking and discussion. Any thoughts or
> requirements on this issue are warmly welcome.
>
> Best,
>  Yun
>
>
> [1] OperatorCoordinator is introduced in FLIP-27:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface.
> This is a component resides in JobManager and could communicate with all
> the subtasks of the corresponding operator, thus it could be used to
> aggregate status from subtasks.
>
> ------------------原始邮件 ------------------
> *发件人:*Robert Metzger <rm...@apache.org>
> *发送时间:*Tue May 12 15:36:26 2020
> *收件人:*Jingsong Li <ji...@gmail.com>
> *抄送:*Peter Groesbeck <pe...@gmail.com>, user <
> user@flink.apache.org>
> *主题:*Re: Writing _SUCCESS Files (Streaming and Batch)
>
>> Hi Peter,
>> I filed a ticket for this feature request:
>> https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
>> thoughts / requirements to the ticket)
>>
>> Best,
>> Robert
>>
>>
>> On Wed, May 6, 2020 at 3:41 AM Jingsong Li <ji...@gmail.com>
>> wrote:
>>
>>> Hi Peter,
>>> The troublesome is how to know the "ending" for a bucket in streaming
>>> job.
>>> In 1.11, we are trying to implement a watermark-related bucket ending
>>> mechanism[1] in Table/SQL.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <
>>> peter.groesbeck@gmail.com> wrote:
>>>
>>>> I am replacing an M/R job with a Streaming job using the
>>>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>>>> file like the old Hadoop job. I have to implement a similar Batch job to
>>>> read from backup files in case of outages or downtime.
>>>>
>>>> The Batch job question was answered here and appears to be still
>>>> relevant although if someone could confirm for me that would be great.
>>>> https://stackoverflow.com/a/39413810
>>>>
>>>> The question of the Streaming job came up back in 2018 here:
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E
>>>>
>>>> But the solution to use or extend the BucketingSink class seems out of
>>>> date now that BucketingSink has been deprecated.
>>>>
>>>> Is there a way to implement a similar solution for StreamingFileSink?
>>>>
>>>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>>>> future.
>>>>
>>>> Thank you,
>>>> Peter
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>

Re: 回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Yun, 

For me, that sounds quite nice. I implemented the same for my application a few weeks ago, but of course tailored only to my app. 
What I did: 
1. I wrapped the Parquet-StreamingFileSink into a Process-Function. 
2. I extended the default ProcessOperator and instead of "notifyCheckpointComplete(long checkpointId)", I provided my WrappedProcessFunction a "notifyCheckpointComplete(checkointId, lastCommitWatermark)". 
3. I added a custom sink with parallelism 1 behind the WrappedProcessFunction. 
4. From my WrappedProcessFunction, in notifyCheckpointComplete, I send a message downstream to the parallelism 1 sink containing data about which partitions were written to between in the phase to the last checkpoint. 
5. In the parallelism 1 sink, I make sure that I get the messages from all upstream task (Give the constructor an int parameter telling it the parallelism of the WrappedProcessFunction) and then perform my parallelism 1 operation, in my case, telling Impala which partitions were added or got new data. Luckily, in case of Impala, that operation can be made idempotent so I only needed to make sure that I have an at least once processing from the state perspective here. 

I had to go for notifyCheckpointComplete as only there, the parquet files are ultimately committed and thus available for spark, impala and so on. 

So if you go on with that issue, I'd be really happy to be able to customize the solution and e.g. get rid of my custom setup by only specifiying kind of a lambda function which should be run with parallelism 1 and update impala. That function would however still need the info which partitions were updated/added. 
And in my case, I was not really interested in the watermark (I sent it downstream only for metric purposes) but want to tell impala after each commit which partitions changed, regardless of the value from the watermark. 

Best regards 
Theo 


Von: "Yun Gao" <yu...@aliyun.com> 
An: "Robert Metzger" <rm...@apache.org>, "Jingsong Li" <ji...@gmail.com> 
CC: "Peter Groesbeck" <pe...@gmail.com>, "user" <us...@flink.apache.org> 
Gesendet: Dienstag, 12. Mai 2020 10:36:59 
Betreff: 回复:Re: Writing _SUCCESS Files (Streaming and Batch) 

Hi Peter, 

Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time related buckets in Table/SQL API, and we are also thinking of supporting "global committing" for arbitrary bucket assigner policy for StreamingFileSink users. The current rough thought is to let users specify when a bucket is terminated on a single task, and the OperatorCoordinator[1] of the sink will aggreate the information from all subtasks about this bucket and do the global committing if the bucket has been finished on all the subtasks, but this is still under thinking and discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best, 
Yun 


[1] OperatorCoordinator is introduced in FLIP-27: [ https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface | https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface ] . This is a component resides in JobManager and could communicate with all the subtasks of the corresponding operator, thus it could be used to aggregate status from subtasks. 




------------------原始邮件 ------------------ 
发件人: Robert Metzger <rm...@apache.org> 
发送时间: Tue May 12 15:36:26 2020 
收件人: Jingsong Li <ji...@gmail.com> 
抄送: Peter Groesbeck <pe...@gmail.com>, user <us...@flink.apache.org> 
主题: Re: Writing _SUCCESS Files (Streaming and Batch) 

BQ_BEGIN

Hi Peter, 
I filed a ticket for this feature request: [ https://issues.apache.org/jira/browse/FLINK-17627 | https://issues.apache.org/jira/browse/FLINK-17627 ] (feel free to add your thoughts / requirements to the ticket) 

Best, 
Robert 


On Wed, May 6, 2020 at 3:41 AM Jingsong Li < [ mailto:jingsonglee0@gmail.com | jingsonglee0@gmail.com ] > wrote: 

BQ_BEGIN

Hi Peter, 
The troublesome is how to know the "ending" for a bucket in streaming job. 
In 1.11, we are trying to implement a watermark-related bucket ending mechanism[1] in Table/SQL. 

[1] [ https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table | https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table ] 

Best, 
Jingsong Lee 

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck < [ mailto:peter.groesbeck@gmail.com | peter.groesbeck@gmail.com ] > wrote: 

BQ_BEGIN

I am replacing an M/R job with a Streaming job using the StreamingFileSink and there is a requirement to generate an empty _SUCCESS file like the old Hadoop job. I have to implement a similar Batch job to read from backup files in case of outages or downtime. 

The Batch job question was answered here and appears to be still relevant although if someone could confirm for me that would be great. 
[ https://stackoverflow.com/a/39413810 | https://stackoverflow.com/a/39413810 ] 

The question of the Streaming job came up back in 2018 here: 
[ http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E | http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E ] 

But the solution to use or extend the BucketingSink class seems out of date now that BucketingSink has been deprecated. 

Is there a way to implement a similar solution for StreamingFileSink? 

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future. 

Thank you, 
Peter 





-- 
Best, Jingsong Lee 

BQ_END


BQ_END


BQ_END


回复:Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Yun Gao <yu...@aliyun.com>.
Hi Peter,

    Sorry for missing the question and response later, I'm currently sworking together with Jingsong on the issue to support "global committing" (like writing _SUCCESS file or adding partitions to hive store) after buckets terminated. In 1.11 we may first support watermark/time related buckets in Table/SQL API, and we are also thinking of supporting "global committing" for arbitrary bucket assigner policy for StreamingFileSink users. The current rough thought is to let users specify when a bucket is terminated on a single task, and the OperatorCoordinator[1] of the sink will aggreate the information from all subtasks about this bucket and do the global committing if the bucket has been finished on all the subtasks, but this is still under thinking and discussion. Any thoughts or requirements on this issue are warmly welcome. 

Best,
 Yun


[1] OperatorCoordinator is introduced in FLIP-27: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface. This is a component resides in JobManager and could communicate with all the subtasks of the corresponding operator, thus it could be used to aggregate status from subtasks. 


 ------------------原始邮件 ------------------
发件人:Robert Metzger <rm...@apache.org>
发送时间:Tue May 12 15:36:26 2020
收件人:Jingsong Li <ji...@gmail.com>
抄送:Peter Groesbeck <pe...@gmail.com>, user <us...@flink.apache.org>
主题:Re: Writing _SUCCESS Files (Streaming and Batch)

Hi Peter,

I filed a ticket for this feature request: https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your thoughts / requirements to the ticket)

Best,
Robert


On Wed, May 6, 2020 at 3:41 AM Jingsong Li <ji...@gmail.com> wrote:

Hi Peter,

The troublesome is how to know the "ending" for a bucket in streaming job.
In 1.11, we are trying to implement a watermark-related bucket ending mechanism[1] in Table/SQL.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee
On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <pe...@gmail.com> wrote:

I am replacing an M/R job with a Streaming job using the StreamingFileSink and there is a requirement to generate an empty _SUCCESS file like the old Hadoop job. I have to implement a similar Batch job to read from backup files in case of outages or downtime.

The Batch job question was answered here and appears to be still relevant although if someone could confirm for me that would be great.
https://stackoverflow.com/a/39413810

The question of the Streaming job came up back in 2018 here:
http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E

But the solution to use or extend the BucketingSink class seems out of date now that BucketingSink has been deprecated.

Is there a way to implement a similar solution for StreamingFileSink?

I'm currently on 1.8.1 although I hope to update to 1.10 in the near future.

Thank you,
Peter

-- 
Best, Jingsong Lee

Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Robert Metzger <rm...@apache.org>.
Hi Peter,

I filed a ticket for this feature request:
https://issues.apache.org/jira/browse/FLINK-17627 (feel free to add your
thoughts / requirements to the ticket)

Best,
Robert


On Wed, May 6, 2020 at 3:41 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi Peter,
>
> The troublesome is how to know the "ending" for a bucket in streaming job.
> In 1.11, we are trying to implement a watermark-related bucket ending
> mechanism[1] in Table/SQL.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
> Best,
> Jingsong Lee
>
> On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <pe...@gmail.com>
> wrote:
>
>> I am replacing an M/R job with a Streaming job using the
>> StreamingFileSink and there is a requirement to generate an empty _SUCCESS
>> file like the old Hadoop job. I have to implement a similar Batch job to
>> read from backup files in case of outages or downtime.
>>
>> The Batch job question was answered here and appears to be still relevant
>> although if someone could confirm for me that would be great.
>> https://stackoverflow.com/a/39413810
>>
>> The question of the Streaming job came up back in 2018 here:
>>
>> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E
>>
>> But the solution to use or extend the BucketingSink class seems out of
>> date now that BucketingSink has been deprecated.
>>
>> Is there a way to implement a similar solution for StreamingFileSink?
>>
>> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
>> future.
>>
>> Thank you,
>> Peter
>>
>
>
> --
> Best, Jingsong Lee
>

Re: Writing _SUCCESS Files (Streaming and Batch)

Posted by Jingsong Li <ji...@gmail.com>.
Hi Peter,

The troublesome is how to know the "ending" for a bucket in streaming job.
In 1.11, we are trying to implement a watermark-related bucket ending
mechanism[1] in Table/SQL.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Tue, May 5, 2020 at 7:40 AM Peter Groesbeck <pe...@gmail.com>
wrote:

> I am replacing an M/R job with a Streaming job using the StreamingFileSink
> and there is a requirement to generate an empty _SUCCESS file like the old
> Hadoop job. I have to implement a similar Batch job to read from backup
> files in case of outages or downtime.
>
> The Batch job question was answered here and appears to be still relevant
> although if someone could confirm for me that would be great.
> https://stackoverflow.com/a/39413810
>
> The question of the Streaming job came up back in 2018 here:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CFF74EED5-602F-4EAA-9BC1-6CDF56611267@gmail.com%3E
>
> But the solution to use or extend the BucketingSink class seems out of
> date now that BucketingSink has been deprecated.
>
> Is there a way to implement a similar solution for StreamingFileSink?
>
> I'm currently on 1.8.1 although I hope to update to 1.10 in the near
> future.
>
> Thank you,
> Peter
>


-- 
Best, Jingsong Lee