You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Benoit MERIAUX <bm...@octo.com> on 2018/08/21 21:45:08 UTC

Some questions about the StreamingFileSink

Hi,

I have some questions about the new StreamingFileSink in 1.6.

My usecase is pretty simple.
I have a cassandra table with 150Millions of lines.
They are partitioned by buckets of 100 000 lines.

My job is to export each "bucket" to a file (1 bucket = 1 file), so the job
is degined like this:

The source get the bucketList
then a flatmap task, fetch the lines matching the bucket and map all the
100 000 lines from cassandra to the collector
then a streamingFileSink write each line into a file by bucket (RowFormat).

The checkpointing is enabled, each 10s
The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
implemented by bucketId (my bucketId not the sink's one :).

My problem is at the end of the job, i only have in-progress.part files for
each bucket.

I do not understand how i can trigger the finalization of the sink and have
the bucket part files committed.

So I read the code of the StreamingFileSink and the Bucket classes.

If i have well understood, the in-progress bucket files can be closed then
committed (closePartFile method of the Bucket) and move to "pending state"
following the rollingPolicy to wait for a checkpoint to be moved to
"finished" state.

So the rollingPolicy can roll part files on each line, on each
BucketInterval or on each checkpoint.
In my case with the OnCheckpointRollingPolicy, it is only on each
checkpoint. Am I right ?

And when a checkpoint is successful all the pending file are moved to
"finished" state and are exploitable by another jobs.

then this is where I start losing myself.

indeed one thing suprised me in the code of the close method of the
StreamingFileSink.
It discards
<https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293>
all
active buckets since the last successful checkpoint!
but at the end of the successful job, no checkpoint is triggered
automatically if the minimal interval since the last checkpoint is not
expired. so what happen to data written since the last checkpoint ?
(-> Is this sink only for endless Stream ?)

How do i do to get all my file with all my data in "finished" state when my
job is finished with success ?
Do I need to trigger a checkpoint manually?
Is there a better fitting sink for my usecase ?
Should i use a another rollingPolicy ? even with the bucket interval there
still is a window between the interval and the end of the job during which
some part files are not closed and committed.

Even in case of an endless stream, I suggest to improve the behavior of the
close method by calling closePartFile on each active bucket so all valid
data since last checkpoint can be committed to pending state waiting for
the checkpoint a the end of the job.
it seems to be the case of the BucketingSink
<https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
-> I can do a PR for this


I'm open to all suggestions :)

regards,

-- 
<http://www.octo.com/>
Benoit  *MERIAUX*
*OCTO Technology*
Consultant Confirmé - Tribu NAD
.................................................
34, Avenue de l'Opéra
75002 Paris
+33 (0)786 59 12 30 <%2F%2F%2B33786591230>
www.octo.com  - blog.octo.com
* www.usievents.com <http://www.usievents.com/> *
@USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
<https://twitter.com/OCTOTechnology>
.................................................

Re: Some questions about the StreamingFileSink

Posted by Benoit MERIAUX <bm...@octo.com>.
Thanks for the detailed answer.
The actual behavior is correct and due to the legacy which do not make a
difference between success and failure when closing the sink.
So the workaround is to use a short bucket interval to commit the last
received data and wait for the next checkpoint (how do I do if my job is
finished)?

Is there any other solution, like using another sink ?

-- 
<http://www.octo.com/>
Benoit  *MERIAUX*
*OCTO Technology*
Consultant Confirmé -  Tribu  Nad
.................................................
34, Avenue de l'Opéra
75002 Paris
+33 (0)786 59 12 30 <%2F%2F%2B33786591230>
www.octo.com  - blog.octo.com
* www.usievents.com <http://www.usievents.com/> *
@USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
<https://twitter.com/OCTOTechnology>
.................................................

Le August 22, 2018 à 10:48:57 AM, Gyula Fóra (gyula.fora@gmail.com) a écrit:

Hi Kostas,

Sorry for jumping in on this discussion :)

What you suggest for finite sources and waiting for checkpoints is pretty
ugly in many cases. Especially if you would otherwise read from a finite
source (a file for instance) and want to end the job asap.

Would it make sense to not discard and delete the buckets, just keep the
files? They could be marked somewhat differently from pending files but at
least they would still be accessible for users in these cases.

Gyula

Kostas Kloudas <k....@data-artisans.com> ezt írta (időpont: 2018. aug.
22., Sze, 10:36):

> Hi Benoit,
>
> Thanks for using the StreamingFileSink. My answers/explanations are
> inlined.
> In most of your observations, you are correct.
>
> On Aug 21, 2018, at 11:45 PM, Benoit MERIAUX <bm...@octo.com> wrote:
>
> Hi,
>
> I have some questions about the new StreamingFileSink in 1.6.
>
> My usecase is pretty simple.
> I have a cassandra table with 150Millions of lines.
> They are partitioned by buckets of 100 000 lines.
>
> My job is to export each "bucket" to a file (1 bucket = 1 file), so the
> job is degined like this:
>
> The source get the bucketList
> then a flatmap task, fetch the lines matching the bucket and map all the
> 100 000 lines from cassandra to the collector
> then a streamingFileSink write each line into a file by bucket (RowFormat).
>
> The checkpointing is enabled, each 10s
> The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
> implemented by bucketId (my bucketId not the sink's one :).
>
> My problem is at the end of the job, i only have in-progress.part files
> for each bucket.
>
> I do not understand how i can trigger the finalization of the sink and
> have the bucket part files committed.
>
> So I read the code of the StreamingFileSink and the Bucket classes.
>
> If i have well understood, the in-progress bucket files can be closed then
> committed (closePartFile method of the Bucket) and move to "pending state"
> following the rollingPolicy to wait for a checkpoint to be moved to
> "finished" state.
>
>
> 1) The in-progress files are closed based on the rolling policy and they
> are put in pending state,
> 2) they are staged for commit when the “next” checkpoint arrives after
> they are put to pending state, and
> 3) they are published when that checkpoint is declared successful.
>
> So the rollingPolicy can roll part files on each line, on each
> BucketInterval or on each checkpoint.
> In my case with the OnCheckpointRollingPolicy, it is only on each
> checkpoint. Am I right ?
>
>
> The OnCheckpointRollingPolicy rolls the in-progress file when it receives
> a checkpoint, and publishes the file when the checkpoint is complete.
>
> But in general, you can specify your own rolling policy when using row
> formats.
> There, you can specify to roll:
> 1) based on an inactivity interval or interval since the in-progress file
> was created
> 2) based on size of the file
> 3) on every checkpoint
> 4) a combination of the above
>
> I would recommend to check the DefaultRollingPolicy for an example.
>
> And when a checkpoint is successful all the pending file are moved to
> "finished" state and are exploitable by another jobs.
>
> then this is where I start losing myself.
>
> indeed one thing suprised me in the code of the close method of the
> StreamingFileSink.
> It discards
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293> all
> active buckets since the last successful checkpoint!
> but at the end of the successful job, no checkpoint is triggered
> automatically if the minimal interval since the last checkpoint is not
> expired. so what happen to data written since the last checkpoint ?
> (-> Is this sink only for endless Stream ?)
>
>
> You are correct here that this is an undesired behavior for jobs with
> finite input.
>
> The reason is that due to legacy, the close() method is called for both
> normal (i.e. successful) and abnormal (i.e. failure) termination of the job.
> Given this, we could not simply declare the pending files as valid because
> in case of a failure, we would have inconsistencies.
> So we went for the conservative approach which simply discards the files.
>
> How do i do to get all my file with all my data in "finished" state when
> my job is finished with success ?
> Do I need to trigger a checkpoint manually?
> Is there a better fitting sink for my usecase ?
> Should i use a another rollingPolicy ? even with the bucket interval there
> still is a window between the interval and the end of the job during which
> some part files are not closed and committed.
>
>
> What you can do is use the default policy and specify an inactivity
> interval, this will make sure that in-progress files are closed and then
> you wait for
> some checkpoints to come and succeed, before canceling the job. I know and
> I agree that this is not the most elegant solution, but until the root
> problem is fixed, I think that this is the best solution.
>
> I hope this helps,
> Kostas
>
>
> Even in case of an endless stream, I suggest to improve the behavior of
> the close method by calling closePartFile on each active bucket so all
> valid data since last checkpoint can be committed to pending state waiting
> for the checkpoint a the end of the job.
> it seems to be the case of the BucketingSink
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
> -> I can do a PR for this
>
>
> I'm open to all suggestions :)
>
> regards,
>
> --
>   <http://www.octo.com/>
> Benoit  *MERIAUX*
> *OCTO Technology*
> Consultant Confirmé - Tribu NAD
> .................................................
> 34, Avenue de l'Opéra
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> 75002 Paris
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> +33 (0)786 59 12 30 <%2F%2F%2B33786591230>
> www.octo.com  - blog.octo.com
> *www.usievents.com <http://www.usievents.com/>*
> @USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
> <https://twitter.com/OCTOTechnology>
> .................................................
>
>
>

Re: Some questions about the StreamingFileSink

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Kostas,

Sorry for jumping in on this discussion :)

What you suggest for finite sources and waiting for checkpoints is pretty
ugly in many cases. Especially if you would otherwise read from a finite
source (a file for instance) and want to end the job asap.

Would it make sense to not discard and delete the buckets, just keep the
files? They could be marked somewhat differently from pending files but at
least they would still be accessible for users in these cases.

Gyula

Kostas Kloudas <k....@data-artisans.com> ezt írta (időpont: 2018. aug.
22., Sze, 10:36):

> Hi Benoit,
>
> Thanks for using the StreamingFileSink. My answers/explanations are
> inlined.
> In most of your observations, you are correct.
>
> On Aug 21, 2018, at 11:45 PM, Benoit MERIAUX <bm...@octo.com> wrote:
>
> Hi,
>
> I have some questions about the new StreamingFileSink in 1.6.
>
> My usecase is pretty simple.
> I have a cassandra table with 150Millions of lines.
> They are partitioned by buckets of 100 000 lines.
>
> My job is to export each "bucket" to a file (1 bucket = 1 file), so the
> job is degined like this:
>
> The source get the bucketList
> then a flatmap task, fetch the lines matching the bucket and map all the
> 100 000 lines from cassandra to the collector
> then a streamingFileSink write each line into a file by bucket (RowFormat).
>
> The checkpointing is enabled, each 10s
> The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is
> implemented by bucketId (my bucketId not the sink's one :).
>
> My problem is at the end of the job, i only have in-progress.part files
> for each bucket.
>
> I do not understand how i can trigger the finalization of the sink and
> have the bucket part files committed.
>
> So I read the code of the StreamingFileSink and the Bucket classes.
>
> If i have well understood, the in-progress bucket files can be closed then
> committed (closePartFile method of the Bucket) and move to "pending state"
> following the rollingPolicy to wait for a checkpoint to be moved to
> "finished" state.
>
>
> 1) The in-progress files are closed based on the rolling policy and they
> are put in pending state,
> 2) they are staged for commit when the “next” checkpoint arrives after
> they are put to pending state, and
> 3) they are published when that checkpoint is declared successful.
>
> So the rollingPolicy can roll part files on each line, on each
> BucketInterval or on each checkpoint.
> In my case with the OnCheckpointRollingPolicy, it is only on each
> checkpoint. Am I right ?
>
>
> The OnCheckpointRollingPolicy rolls the in-progress file when it receives
> a checkpoint, and publishes the file when the checkpoint is complete.
>
> But in general, you can specify your own rolling policy when using row
> formats.
> There, you can specify to roll:
> 1) based on an inactivity interval or interval since the in-progress file
> was created
> 2) based on size of the file
> 3) on every checkpoint
> 4) a combination of the above
>
> I would recommend to check the DefaultRollingPolicy for an example.
>
> And when a checkpoint is successful all the pending file are moved to
> "finished" state and are exploitable by another jobs.
>
> then this is where I start losing myself.
>
> indeed one thing suprised me in the code of the close method of the
> StreamingFileSink.
> It discards
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293> all
> active buckets since the last successful checkpoint!
> but at the end of the successful job, no checkpoint is triggered
> automatically if the minimal interval since the last checkpoint is not
> expired. so what happen to data written since the last checkpoint ?
> (-> Is this sink only for endless Stream ?)
>
>
> You are correct here that this is an undesired behavior for jobs with
> finite input.
>
> The reason is that due to legacy, the close() method is called for both
> normal (i.e. successful) and abnormal (i.e. failure) termination of the job.
> Given this, we could not simply declare the pending files as valid because
> in case of a failure, we would have inconsistencies.
> So we went for the conservative approach which simply discards the files.
>
> How do i do to get all my file with all my data in "finished" state when
> my job is finished with success ?
> Do I need to trigger a checkpoint manually?
> Is there a better fitting sink for my usecase ?
> Should i use a another rollingPolicy ? even with the bucket interval there
> still is a window between the interval and the end of the job during which
> some part files are not closed and committed.
>
>
> What you can do is use the default policy and specify an inactivity
> interval, this will make sure that in-progress files are closed and then
> you wait for
> some checkpoints to come and succeed, before canceling the job. I know and
> I agree that this is not the most elegant solution, but until the root
> problem is fixed, I think that this is the best solution.
>
> I hope this helps,
> Kostas
>
>
> Even in case of an endless stream, I suggest to improve the behavior of
> the close method by calling closePartFile on each active bucket so all
> valid data since last checkpoint can be committed to pending state waiting
> for the checkpoint a the end of the job.
> it seems to be the case of the BucketingSink
> <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
> -> I can do a PR for this
>
>
> I'm open to all suggestions :)
>
> regards,
>
> --
>   <http://www.octo.com/>
> Benoit  *MERIAUX*
> *OCTO Technology*
> Consultant Confirmé - Tribu NAD
> .................................................
> 34, Avenue de l'Opéra
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> 75002 Paris
> <https://maps.google.com/?q=34,+Avenue+de+l'Op%C3%A9ra+75002+Paris&entry=gmail&source=g>
> +33 (0)786 59 12 30 <%2F%2F%2B33786591230>
> www.octo.com  - blog.octo.com
> *www.usievents.com <http://www.usievents.com/>*
> @USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology
> <https://twitter.com/OCTOTechnology>
> .................................................
>
>
>

Re: Some questions about the StreamingFileSink

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Benoit,

Thanks for using the StreamingFileSink. My answers/explanations are inlined.
In most of your observations, you are correct.

> On Aug 21, 2018, at 11:45 PM, Benoit MERIAUX <bm...@octo.com> wrote:
> 
> Hi,
> 
> I have some questions about the new StreamingFileSink in 1.6.
> 
> My usecase is pretty simple.
> I have a cassandra table with 150Millions of lines.
> They are partitioned by buckets of 100 000 lines.
> 
> My job is to export each "bucket" to a file (1 bucket = 1 file), so the job is degined like this:
> 
> The source get the bucketList
> then a flatmap task, fetch the lines matching the bucket and map all the 100 000 lines from cassandra to the collector
> then a streamingFileSink write each line into a file by bucket (RowFormat).
> 
> The checkpointing is enabled, each 10s
> The rollingPolicy is OnCheckpointRollingPolicy, and the bucketAssigner is implemented by bucketId (my bucketId not the sink's one :).
> 
> My problem is at the end of the job, i only have in-progress.part files for each bucket.
> 
> I do not understand how i can trigger the finalization of the sink and have the bucket part files committed.
> 
> So I read the code of the StreamingFileSink and the Bucket classes.
> 
> If i have well understood, the in-progress bucket files can be closed then committed (closePartFile method of the Bucket) and move to "pending state" following the rollingPolicy to wait for a checkpoint to be moved to "finished" state.
> 

1) The in-progress files are closed based on the rolling policy and they are put in pending state, 
2) they are staged for commit when the “next” checkpoint arrives after they are put to pending state, and 
3) they are published when that checkpoint is declared successful.

> So the rollingPolicy can roll part files on each line, on each BucketInterval or on each checkpoint.
> In my case with the OnCheckpointRollingPolicy, it is only on each checkpoint. Am I right ?
> 

The OnCheckpointRollingPolicy rolls the in-progress file when it receives a checkpoint, and publishes the file when the checkpoint is complete.

But in general, you can specify your own rolling policy when using row formats.
There, you can specify to roll:
1) based on an inactivity interval or interval since the in-progress file was created 
2) based on size of the file
3) on every checkpoint
4) a combination of the above

I would recommend to check the DefaultRollingPolicy for an example.

> And when a checkpoint is successful all the pending file are moved to "finished" state and are exploitable by another jobs.
> 
> then this is where I start losing myself.
> 
> indeed one thing suprised me in the code of the close method of the StreamingFileSink.
> It discards <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java#L293> all active buckets since the last successful checkpoint!  
> but at the end of the successful job, no checkpoint is triggered automatically if the minimal interval since the last checkpoint is not expired. so what happen to data written since the last checkpoint ?
> (-> Is this sink only for endless Stream ?)
> 

You are correct here that this is an undesired behavior for jobs with finite input.

The reason is that due to legacy, the close() method is called for both normal (i.e. successful) and abnormal (i.e. failure) termination of the job.
Given this, we could not simply declare the pending files as valid because in case of a failure, we would have inconsistencies. 
So we went for the conservative approach which simply discards the files.

> How do i do to get all my file with all my data in "finished" state when my job is finished with success ?
> Do I need to trigger a checkpoint manually?
> Is there a better fitting sink for my usecase ?
> Should i use a another rollingPolicy ? even with the bucket interval there still is a window between the interval and the end of the job during which some part files are not closed and committed.

What you can do is use the default policy and specify an inactivity interval, this will make sure that in-progress files are closed and then you wait for 
some checkpoints to come and succeed, before canceling the job. I know and I agree that this is not the most elegant solution, but until the root 
problem is fixed, I think that this is the best solution.

I hope this helps,
Kostas

> 
> Even in case of an endless stream, I suggest to improve the behavior of the close method by calling closePartFile on each active bucket so all valid data since last checkpoint can be committed to pending state waiting for the checkpoint a the end of the job.
> it seems to be the case of the BucketingSink <https://github.com/apache/flink/blob/852502b7d51f91c6f9c3479424516d0b9ae255e5/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L437>
> -> I can do a PR for this
> 
> 
> I'm open to all suggestions :)
> 
> regards,
> 
> --
>   <http://www.octo.com/>
> Benoit  MERIAUX 
> OCTO Technology
> Consultant Confirmé - Tribu NAD
> .................................................
> 34, Avenue de l'Opéra
> 75002 Paris
> +33 (0)786 59 12 30 <tel:%2F%2F%2B33786591230> 
> www.octo.com <http://www.octo.com/>  - blog.octo.com <http://blog.octo.com/> 
> www.usievents.com <http://www.usievents.com/>
> @USIEvents <https://twitter.com/USIEvents>  - @OCTOTechnology <https://twitter.com/OCTOTechnology> 
> .................................................