You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Teunissen, F.G.J. (Fred)" <Fr...@ing.com> on 2020/09/07 15:39:50 UTC

Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

Hi All,

My flink-job is using bounded input sources and writes the results to a StreamingFileSink.
When it has processed all the input the job is finished and closes. But the output files are still
named “<prefix>-0-0.<ext>.inprogress.<guid>”. I expected them to be named ““<prefix>-0-0.<ext>”.

Did I forget some setting or something else?

Regards,
Fred


-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended recipient. If you are not the intended recipient, don't use or disclose it in any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

this is indeed the correct behaviour right now. Which doesn't mean that 
it's the behaviour that we would like to have.

The reason why we can't move the "pending" files to "final" is that we 
don't have a point where we can do this in an idempotent and retryable 
fashion. When we do regular STREAMING execution we do checkpoints and 
when the checkpoints complete we can move pending files. If this fails, 
it will be retried because we still have all the information we need for 
that in the checkpoint. (It's basically a two-phase commit protocol). 
When a bounded STREAMING programs simply finishes, we don't have a point 
where we can do that. A colleague of mine (Yun in cc) is actually 
working on a proposal to do one "final checkpoint" for exactly this.

We're also working on better support for bounded programs on the 
DataStream API, I'll try and summarise this below.

A couple of colleagues and I are currently thinking about how we can 
bring support for good BATCH execution to the DataStream API. The 
starting point is https://s.apache.org/FLIP-131 which discusses eventual 
deprecation of the DataSet API, followed by 
https://s.apache.org/FLIP-134 which outlines the semantics of BATCH 
execution on the DataStraem API, and https://s.apache.org/FLIP-140 which 
discusses improved runtime behaviour for BATCH programs on the 
DataStream API.

The last piece of the puzzle will be sinks, which also need to work well 
for both BATCH and STREAMING programs on the DataStream API. We're 
expecting to publish a FLIP for this shortly.

Best,
Aljoscha

On 07.09.20 19:29, Ken Krugler wrote:
> Hi Fred,
> 
> I think this is the current behavior (though it would be helpful to know which version of Flink you’re using).
> 
>  From an email conversation with Kostas in January of this year:
> 
>> Hi Ken, Jingsong and Li,
>>
>> Sorry for the late reply.
>>
>> As Jingsong pointed out, upon calling close() the StreamingFileSink
>> does not commit the in-progress/pending files.
>> The reason for this is that the close() method of any UDF including
>> sink functions is called on both normal termination and termination
>> due to failure.
>> Given this, we cannot commit the files, because in case of failure
>> they should be reverted.
>>
>> Actually we are currently updating the StreamingFileSink docs to
>> includes this among other things.
>> Also the differentiation between normal termination and termination
>> due to failure will hopefully be part of Flink 1.11 and
>> this is the FLIP to check
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>.
>>
>> Cheers,
>> Kostas
> 
> Though it looks like FLIP-46 is still under discussion, and thus 1.11 doesn’t have a fix for this?
> 
> — Ken
> 
>> On Sep 7, 2020, at 8:39 AM, Teunissen, F.G.J. (Fred) <Fred.Teunissen@ing.com <ma...@ing.com>> wrote:
>>
>> Hi All,
>>   
>> My flink-job is using bounded input sources and writes the results to a StreamingFileSink.
>> When it has processed all the input the job is finished and closes. But the output files are still
>> named “<prefix>-0-0.<ext>.inprogress.<guid>”. I expected them to be named ““<prefix>-0-0.<ext>”.
>>
>> Did I forget some setting or something else?
>>   
>> Regards,
>> Fred
>>   
>> -----------------------------------------------------------------
>> ATTENTION:
>> The information in this e-mail is confidential and only meant for the intended recipient. If you are not the intended recipient, don't use or disclose it in any way. Please let the sender know and delete the message immediately.
>> -----------------------------------------------------------------
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 


Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

Posted by Ken Krugler <kk...@transpac.com>.
Hi Fred,

I think this is the current behavior (though it would be helpful to know which version of Flink you’re using).

From an email conversation with Kostas in January of this year:

> Hi Ken, Jingsong and Li,
> 
> Sorry for the late reply.
> 
> As Jingsong pointed out, upon calling close() the StreamingFileSink
> does not commit the in-progress/pending files.
> The reason for this is that the close() method of any UDF including
> sink functions is called on both normal termination and termination
> due to failure.
> Given this, we cannot commit the files, because in case of failure
> they should be reverted.
> 
> Actually we are currently updating the StreamingFileSink docs to
> includes this among other things.
> Also the differentiation between normal termination and termination
> due to failure will hopefully be part of Flink 1.11 and
> this is the FLIP to check
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>.
> 
> Cheers,
> Kostas

Though it looks like FLIP-46 is still under discussion, and thus 1.11 doesn’t have a fix for this?

— Ken

> On Sep 7, 2020, at 8:39 AM, Teunissen, F.G.J. (Fred) <Fred.Teunissen@ing.com <ma...@ing.com>> wrote:
> 
> Hi All,
>  
> My flink-job is using bounded input sources and writes the results to a StreamingFileSink.
> When it has processed all the input the job is finished and closes. But the output files are still
> named “<prefix>-0-0.<ext>.inprogress.<guid>”. I expected them to be named ““<prefix>-0-0.<ext>”.
> 
> Did I forget some setting or something else?
>  
> Regards,
> Fred
>  
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the intended recipient. If you are not the intended recipient, don't use or disclose it in any way. Please let the sender know and delete the message immediately.
> -----------------------------------------------------------------

--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr