You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kaustubh Rudrawar <ka...@box.com> on 2019/02/06 06:47:02 UTC

Exactly Once Guarantees with StreamingFileSink to S3

Hi,

I'm trying to understand the exactly once semantics of the
StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
guarantees exactly once under a very specific failure scenario.

For simplicity, lets say we will roll the current part file on checkpoint
(and only on checkpoint), the process is as follows:
1. Framework tells the sink to prepare for a checkpoint. This ultimately
results in 'onReceptionOfCheckpoint' being called on Bucket.java.
2. This takes the current file, and based on our roll policy of rolling on
checkpoint, it closes and uploads it to S3 as part of a MPU and the
reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
3. Once the checkpoint successfully completes, the bucket is notified via
'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
through all pendingPartsPerCheckpoint and for each of them: recovers the in
progress part (which doesn't exist in this scenario) and then commits the
upload.
4. The AmazonS3Client is ultimately called to perform the upload and it
retries the attempt up to N times. If it exhausts retries, it will throw an
Exception.
5. Upon successful commit of the MPU, Bucket clears out its references to
these uploads from its state.

Given this flow, I'm having trouble understanding how the following
scenario works:

   - Step 4: The commit on the MPU succeeds,
   - Step 5: Before this step completes, the task crashes. So at this point, S3
   has successfully completed the MPU but to the client (the Flink job), it
   has not completed.
   - Flink will then recover from the checkpoint we just took and steps 3
   and 4 will be repeated. My understanding is that, since the MPU succeeded
   previously, any attempts at re-committing that upload will result in a 404
   ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
   retried by the framework and this process repeats itself.

So how is this case handled?

Really appreciate the help!
-Kaustubh

Re: Exactly Once Guarantees with StreamingFileSink to S3

Posted by Kostas Kloudas <k....@da-platform.com>.
No problem!

On Wed, Feb 6, 2019 at 6:38 PM Kaustubh Rudrawar <ka...@box.com> wrote:

> Hi Kostas,
>
> Thanks for the response! Yes - I see the commitAfterRecovery being called
> when a Bucket is restored. I confused myself in thinking that
> 'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
> led me to believe that we were only calling commit and not
> commitAfterRecovery.
>
> Thanks for the clarification!
> -Kaustubh
>
> On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Kaustubh,
>>
>> Your general understanding is correct.
>>
>> In this case though, the sink will call the
>> S3Committer#commitAfterRecovery() method.
>> This method, after failing to commit the MPU, it will check if the file
>> is there and if the length
>> is correct, and if everything is ok (which is the case in your example),
>> then it will
>> continue to normal execution.
>>
>> I hope this helps.
>>
>> Kostas
>>
>> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <ka...@box.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to understand the exactly once semantics of the
>>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>>> guarantees exactly once under a very specific failure scenario.
>>>
>>> For simplicity, lets say we will roll the current part file on
>>> checkpoint (and only on checkpoint), the process is as follows:
>>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>>> 2. This takes the current file, and based on our roll policy of rolling
>>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>>> 3. Once the checkpoint successfully completes, the bucket is notified
>>> via 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>>> progress part (which doesn't exist in this scenario) and then commits the
>>> upload.
>>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>>> retries the attempt up to N times. If it exhausts retries, it will throw an
>>> Exception.
>>> 5. Upon successful commit of the MPU, Bucket clears out its references
>>> to these uploads from its state.
>>>
>>> Given this flow, I'm having trouble understanding how the following
>>> scenario works:
>>>
>>>    - Step 4: The commit on the MPU succeeds,
>>>    - Step 5: Before this step completes, the task crashes. So at this
>>>    point, S3 has successfully completed the MPU but to the client (the
>>>    Flink job), it has not completed.
>>>    - Flink will then recover from the checkpoint we just took and steps
>>>    3 and 4 will be repeated. My understanding is that, since the MPU succeeded
>>>    previously, any attempts at re-committing that upload will result in a 404
>>>    ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
>>>    retried by the framework and this process repeats itself.
>>>
>>> So how is this case handled?
>>>
>>> Really appreciate the help!
>>> -Kaustubh
>>>
>>>
>>>

Re: Exactly Once Guarantees with StreamingFileSink to S3

Posted by Kaustubh Rudrawar <ka...@box.com>.
Hi Kostas,

Thanks for the response! Yes - I see the commitAfterRecovery being called
when a Bucket is restored. I confused myself in thinking that
'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
led me to believe that we were only calling commit and not
commitAfterRecovery.

Thanks for the clarification!
-Kaustubh

On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Kaustubh,
>
> Your general understanding is correct.
>
> In this case though, the sink will call the
> S3Committer#commitAfterRecovery() method.
> This method, after failing to commit the MPU, it will check if the file is
> there and if the length
> is correct, and if everything is ok (which is the case in your example),
> then it will
> continue to normal execution.
>
> I hope this helps.
>
> Kostas
>
> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <ka...@box.com> wrote:
>
>> Hi,
>>
>> I'm trying to understand the exactly once semantics of the
>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>> guarantees exactly once under a very specific failure scenario.
>>
>> For simplicity, lets say we will roll the current part file on checkpoint
>> (and only on checkpoint), the process is as follows:
>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>> 2. This takes the current file, and based on our roll policy of rolling
>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>> 3. Once the checkpoint successfully completes, the bucket is notified via
>> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>> progress part (which doesn't exist in this scenario) and then commits the
>> upload.
>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>> retries the attempt up to N times. If it exhausts retries, it will throw an
>> Exception.
>> 5. Upon successful commit of the MPU, Bucket clears out its references to
>> these uploads from its state.
>>
>> Given this flow, I'm having trouble understanding how the following
>> scenario works:
>>
>>    - Step 4: The commit on the MPU succeeds,
>>    - Step 5: Before this step completes, the task crashes. So at this
>>    point, S3 has successfully completed the MPU but to the client (the
>>    Flink job), it has not completed.
>>    - Flink will then recover from the checkpoint we just took and steps
>>    3 and 4 will be repeated. My understanding is that, since the MPU succeeded
>>    previously, any attempts at re-committing that upload will result in a 404
>>    ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
>>    retried by the framework and this process repeats itself.
>>
>> So how is this case handled?
>>
>> Really appreciate the help!
>> -Kaustubh
>>
>>
>>

Re: Exactly Once Guarantees with StreamingFileSink to S3

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Kaustubh,

Your general understanding is correct.

In this case though, the sink will call the
S3Committer#commitAfterRecovery() method.
This method, after failing to commit the MPU, it will check if the file is
there and if the length
is correct, and if everything is ok (which is the case in your example),
then it will
continue to normal execution.

I hope this helps.

Kostas

On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar <ka...@box.com> wrote:

> Hi,
>
> I'm trying to understand the exactly once semantics of the
> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
> guarantees exactly once under a very specific failure scenario.
>
> For simplicity, lets say we will roll the current part file on checkpoint
> (and only on checkpoint), the process is as follows:
> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
> 2. This takes the current file, and based on our roll policy of rolling on
> checkpoint, it closes and uploads it to S3 as part of a MPU and the
> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
> 3. Once the checkpoint successfully completes, the bucket is notified via
> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
> through all pendingPartsPerCheckpoint and for each of them: recovers the in
> progress part (which doesn't exist in this scenario) and then commits the
> upload.
> 4. The AmazonS3Client is ultimately called to perform the upload and it
> retries the attempt up to N times. If it exhausts retries, it will throw an
> Exception.
> 5. Upon successful commit of the MPU, Bucket clears out its references to
> these uploads from its state.
>
> Given this flow, I'm having trouble understanding how the following
> scenario works:
>
>    - Step 4: The commit on the MPU succeeds,
>    - Step 5: Before this step completes, the task crashes. So at this
>    point, S3 has successfully completed the MPU but to the client (the
>    Flink job), it has not completed.
>    - Flink will then recover from the checkpoint we just took and steps 3
>    and 4 will be repeated. My understanding is that, since the MPU succeeded
>    previously, any attempts at re-committing that upload will result in a 404
>    ('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
>    retried by the framework and this process repeats itself.
>
> So how is this case handled?
>
> Really appreciate the help!
> -Kaustubh
>
>
>