You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Richard Deurwaarder <ri...@xeli.eu> on 2020/01/28 17:24:57 UTC

Does flink support retries on checkpoint write failures

Hi all,

We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to
Google Cloud Storage[1]. We've noticed that jobs with a large amount of
state (500gb range) are becoming *very* unstable. In the order of
restarting once an hour or even more.

The reason for this instability is that we run into "410 Gone"[4] errors
from Google Cloud Storage. This indicates an upload (write from Flink's
perspective) took place and it wanted to resume the write[2] but could not
find the file which it needed to resume. My guess is this is because the
previous attempt either failed or perhaps it uploads in chunks of 67mb [3].

The library logs this line when this happens:

"Encountered status code 410 when accessing URL
https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
Delegating to response handler for possible retry."

We're kind of stuck on these questions:
* Is flink capable or doing these retries?
* Does anyone succesfully write their (rocksdb) state to Google Cloud
storage for bigger state sizes?
* Is it possible flink renames or deletes certain directories before all
flushes have been done based on an atomic guarantee provided by HDFS that
does not hold on other implementations perhaps? A race condition of sorts

Basically does anyone recognize this behavior?

Regards,

Richard Deurwaarder

[1] We use an HDFS implementation provided by Google
https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
[2] https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
[3]
https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md
(see
fs.gs.outputstream.upload.chunk.size)
[4] Stacktrace:
https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492

Re: Does flink support retries on checkpoint write failures

Posted by Till Rohrmann <tr...@apache.org>.
Glad to hear that you could solve/mitigate the problem and thanks for
letting us know.

Cheers,
Till

On Sat, Feb 1, 2020 at 2:45 PM Richard Deurwaarder <ri...@xeli.eu> wrote:

> Hi Till & others,
>
> We enabled setFailOnCheckpointingErrors
> (setTolerableCheckpointFailureNumber isn't available in 1.8) and this
> indeed prevents the large number of restarts.
>
> Hopefully a solution for the reported issue[1] with google gets found but
> for now this solved our immediate problem.
>
> Thanks again!
>
> [1] https://issuetracker.google.com/issues/137168102
>
> Regards,
>
> Richard
>
> On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> If a checkpoint is not successful, it cannot be used for recovery.
>> That means Flink will restart to the last successful checkpoint and hence
>> not lose any data.
>>
>> On Wed, Jan 29, 2020 at 9:52 PM wvl <le...@gmail.com> wrote:
>>
>>> Forgive my lack of knowledge here - I'm a bit out of my league here.
>>>
>>> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
>>> for which somehow caused a record to be lost (e.g. rocksdb exception /
>>> taskmanager crash / etc), there would be no Source rewind to the last
>>> successful checkpoint and this record would be lost forever, correct?
>>>
>>> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <ri...@xeli.eu> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I'll see if we can ask google to comment on those issues, perhaps they
>>>> have a fix in the works that would solve the root problem.
>>>> In the meanwhile
>>>> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
>>>> promising!
>>>> Thank you for this. I'm going to try this tomorrow to see if that
>>>> helps. I will let you know!
>>>>
>>>> Richard
>>>>
>>>> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Richard,
>>>>>
>>>>> googling a bit indicates that this might actually be a GCS problem [1,
>>>>> 2, 3]. The proposed solution/workaround so far is to retry the whole upload
>>>>> operation as part of the application logic. Since I assume that you are
>>>>> writing to GCS via Hadoop's file system this should actually fall into the
>>>>> realm of the Hadoop file system implementation and not Flink.
>>>>>
>>>>> What you could do to mitigate the problem a bit is to set the number
>>>>> of tolerable checkpoint failures to a non-zero value via
>>>>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>>>>> means that the job will only fail and then restart after `n` checkpoint
>>>>> failures. Unfortunately, we do not support a failure rate yet.
>>>>>
>>>>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>>>>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>>>>> [3] https://issuetracker.google.com/issues/137168102
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> We've got a Flink job running on 1.8.0 which writes its state
>>>>>> (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large
>>>>>> amount of state (500gb range) are becoming *very* unstable. In the order of
>>>>>> restarting once an hour or even more.
>>>>>>
>>>>>> The reason for this instability is that we run into "410 Gone"[4]
>>>>>> errors from Google Cloud Storage. This indicates an upload (write from
>>>>>> Flink's perspective) took place and it wanted to resume the write[2] but
>>>>>> could not find the file which it needed to resume. My guess is this is
>>>>>> because the previous attempt either failed or perhaps it uploads in chunks
>>>>>> of 67mb [3].
>>>>>>
>>>>>> The library logs this line when this happens:
>>>>>>
>>>>>> "Encountered status code 410 when accessing URL
>>>>>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>>>>> Delegating to response handler for possible retry."
>>>>>>
>>>>>> We're kind of stuck on these questions:
>>>>>> * Is flink capable or doing these retries?
>>>>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>>>>> storage for bigger state sizes?
>>>>>> * Is it possible flink renames or deletes certain directories before
>>>>>> all flushes have been done based on an atomic guarantee provided by HDFS
>>>>>> that does not hold on other implementations perhaps? A race condition of
>>>>>> sorts
>>>>>>
>>>>>> Basically does anyone recognize this behavior?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Richard Deurwaarder
>>>>>>
>>>>>> [1] We use an HDFS implementation provided by Google
>>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>>>>> [2]
>>>>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>>>>> [3]
>>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
>>>>>> fs.gs.outputstream.upload.chunk.size)
>>>>>> [4] Stacktrace:
>>>>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>>>>
>>>>>

Re: Does flink support retries on checkpoint write failures

Posted by Richard Deurwaarder <ri...@xeli.eu>.
Hi Till & others,

We enabled setFailOnCheckpointingErrors
(setTolerableCheckpointFailureNumber isn't available in 1.8) and this
indeed prevents the large number of restarts.

Hopefully a solution for the reported issue[1] with google gets found but
for now this solved our immediate problem.

Thanks again!

[1] https://issuetracker.google.com/issues/137168102

Regards,

Richard

On Thu, Jan 30, 2020 at 11:40 AM Arvid Heise <ar...@ververica.com> wrote:

> If a checkpoint is not successful, it cannot be used for recovery.
> That means Flink will restart to the last successful checkpoint and hence
> not lose any data.
>
> On Wed, Jan 29, 2020 at 9:52 PM wvl <le...@gmail.com> wrote:
>
>> Forgive my lack of knowledge here - I'm a bit out of my league here.
>>
>> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
>> for which somehow caused a record to be lost (e.g. rocksdb exception /
>> taskmanager crash / etc), there would be no Source rewind to the last
>> successful checkpoint and this record would be lost forever, correct?
>>
>> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <ri...@xeli.eu> wrote:
>>
>>> Hi Till,
>>>
>>> I'll see if we can ask google to comment on those issues, perhaps they
>>> have a fix in the works that would solve the root problem.
>>> In the meanwhile
>>> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
>>> promising!
>>> Thank you for this. I'm going to try this tomorrow to see if that helps.
>>> I will let you know!
>>>
>>> Richard
>>>
>>> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Richard,
>>>>
>>>> googling a bit indicates that this might actually be a GCS problem [1,
>>>> 2, 3]. The proposed solution/workaround so far is to retry the whole upload
>>>> operation as part of the application logic. Since I assume that you are
>>>> writing to GCS via Hadoop's file system this should actually fall into the
>>>> realm of the Hadoop file system implementation and not Flink.
>>>>
>>>> What you could do to mitigate the problem a bit is to set the number of
>>>> tolerable checkpoint failures to a non-zero value via
>>>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>>>> means that the job will only fail and then restart after `n` checkpoint
>>>> failures. Unfortunately, we do not support a failure rate yet.
>>>>
>>>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>>>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>>>> [3] https://issuetracker.google.com/issues/137168102
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We've got a Flink job running on 1.8.0 which writes its state
>>>>> (rocksdb) to Google Cloud Storage[1]. We've noticed that jobs with a large
>>>>> amount of state (500gb range) are becoming *very* unstable. In the order of
>>>>> restarting once an hour or even more.
>>>>>
>>>>> The reason for this instability is that we run into "410 Gone"[4]
>>>>> errors from Google Cloud Storage. This indicates an upload (write from
>>>>> Flink's perspective) took place and it wanted to resume the write[2] but
>>>>> could not find the file which it needed to resume. My guess is this is
>>>>> because the previous attempt either failed or perhaps it uploads in chunks
>>>>> of 67mb [3].
>>>>>
>>>>> The library logs this line when this happens:
>>>>>
>>>>> "Encountered status code 410 when accessing URL
>>>>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>>>> Delegating to response handler for possible retry."
>>>>>
>>>>> We're kind of stuck on these questions:
>>>>> * Is flink capable or doing these retries?
>>>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>>>> storage for bigger state sizes?
>>>>> * Is it possible flink renames or deletes certain directories before
>>>>> all flushes have been done based on an atomic guarantee provided by HDFS
>>>>> that does not hold on other implementations perhaps? A race condition of
>>>>> sorts
>>>>>
>>>>> Basically does anyone recognize this behavior?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Richard Deurwaarder
>>>>>
>>>>> [1] We use an HDFS implementation provided by Google
>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>>>> [2]
>>>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>>>> [3]
>>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
>>>>> fs.gs.outputstream.upload.chunk.size)
>>>>> [4] Stacktrace:
>>>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>>>
>>>>

Re: Does flink support retries on checkpoint write failures

Posted by Arvid Heise <ar...@ververica.com>.
If a checkpoint is not successful, it cannot be used for recovery.
That means Flink will restart to the last successful checkpoint and hence
not lose any data.

On Wed, Jan 29, 2020 at 9:52 PM wvl <le...@gmail.com> wrote:

> Forgive my lack of knowledge here - I'm a bit out of my league here.
>
> But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
> for which somehow caused a record to be lost (e.g. rocksdb exception /
> taskmanager crash / etc), there would be no Source rewind to the last
> successful checkpoint and this record would be lost forever, correct?
>
> On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <ri...@xeli.eu> wrote:
>
>> Hi Till,
>>
>> I'll see if we can ask google to comment on those issues, perhaps they
>> have a fix in the works that would solve the root problem.
>> In the meanwhile
>> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
>> promising!
>> Thank you for this. I'm going to try this tomorrow to see if that helps.
>> I will let you know!
>>
>> Richard
>>
>> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Richard,
>>>
>>> googling a bit indicates that this might actually be a GCS problem [1,
>>> 2, 3]. The proposed solution/workaround so far is to retry the whole upload
>>> operation as part of the application logic. Since I assume that you are
>>> writing to GCS via Hadoop's file system this should actually fall into the
>>> realm of the Hadoop file system implementation and not Flink.
>>>
>>> What you could do to mitigate the problem a bit is to set the number of
>>> tolerable checkpoint failures to a non-zero value via
>>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>>> means that the job will only fail and then restart after `n` checkpoint
>>> failures. Unfortunately, we do not support a failure rate yet.
>>>
>>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>>> [3] https://issuetracker.google.com/issues/137168102
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We've got a Flink job running on 1.8.0 which writes its state (rocksdb)
>>>> to Google Cloud Storage[1]. We've noticed that jobs with a large amount of
>>>> state (500gb range) are becoming *very* unstable. In the order of
>>>> restarting once an hour or even more.
>>>>
>>>> The reason for this instability is that we run into "410 Gone"[4]
>>>> errors from Google Cloud Storage. This indicates an upload (write from
>>>> Flink's perspective) took place and it wanted to resume the write[2] but
>>>> could not find the file which it needed to resume. My guess is this is
>>>> because the previous attempt either failed or perhaps it uploads in chunks
>>>> of 67mb [3].
>>>>
>>>> The library logs this line when this happens:
>>>>
>>>> "Encountered status code 410 when accessing URL
>>>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>>> Delegating to response handler for possible retry."
>>>>
>>>> We're kind of stuck on these questions:
>>>> * Is flink capable or doing these retries?
>>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>>> storage for bigger state sizes?
>>>> * Is it possible flink renames or deletes certain directories before
>>>> all flushes have been done based on an atomic guarantee provided by HDFS
>>>> that does not hold on other implementations perhaps? A race condition of
>>>> sorts
>>>>
>>>> Basically does anyone recognize this behavior?
>>>>
>>>> Regards,
>>>>
>>>> Richard Deurwaarder
>>>>
>>>> [1] We use an HDFS implementation provided by Google
>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>>> [2]
>>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>>> [3]
>>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
>>>> fs.gs.outputstream.upload.chunk.size)
>>>> [4] Stacktrace:
>>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>>
>>>

Re: Does flink support retries on checkpoint write failures

Posted by wvl <le...@gmail.com>.
Forgive my lack of knowledge here - I'm a bit out of my league here.

But I was wondering if allowing e.g. 1 checkpoint to fail and the reason
for which somehow caused a record to be lost (e.g. rocksdb exception /
taskmanager crash / etc), there would be no Source rewind to the last
successful checkpoint and this record would be lost forever, correct?

On Wed, 29 Jan 2020, 17:51 Richard Deurwaarder, <ri...@xeli.eu> wrote:

> Hi Till,
>
> I'll see if we can ask google to comment on those issues, perhaps they
> have a fix in the works that would solve the root problem.
> In the meanwhile
> `CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
> promising!
> Thank you for this. I'm going to try this tomorrow to see if that helps. I
> will let you know!
>
> Richard
>
> On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Richard,
>>
>> googling a bit indicates that this might actually be a GCS problem [1, 2,
>> 3]. The proposed solution/workaround so far is to retry the whole upload
>> operation as part of the application logic. Since I assume that you are
>> writing to GCS via Hadoop's file system this should actually fall into the
>> realm of the Hadoop file system implementation and not Flink.
>>
>> What you could do to mitigate the problem a bit is to set the number of
>> tolerable checkpoint failures to a non-zero value via
>> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
>> means that the job will only fail and then restart after `n` checkpoint
>> failures. Unfortunately, we do not support a failure rate yet.
>>
>> [1] https://github.com/googleapis/google-cloud-java/issues/3586
>> [2] https://github.com/googleapis/google-cloud-java/issues/5704
>> [3] https://issuetracker.google.com/issues/137168102
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu>
>> wrote:
>>
>>> Hi all,
>>>
>>> We've got a Flink job running on 1.8.0 which writes its state (rocksdb)
>>> to Google Cloud Storage[1]. We've noticed that jobs with a large amount of
>>> state (500gb range) are becoming *very* unstable. In the order of
>>> restarting once an hour or even more.
>>>
>>> The reason for this instability is that we run into "410 Gone"[4] errors
>>> from Google Cloud Storage. This indicates an upload (write from Flink's
>>> perspective) took place and it wanted to resume the write[2] but could not
>>> find the file which it needed to resume. My guess is this is because the
>>> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>>>
>>> The library logs this line when this happens:
>>>
>>> "Encountered status code 410 when accessing URL
>>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>>> Delegating to response handler for possible retry."
>>>
>>> We're kind of stuck on these questions:
>>> * Is flink capable or doing these retries?
>>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>>> storage for bigger state sizes?
>>> * Is it possible flink renames or deletes certain directories before all
>>> flushes have been done based on an atomic guarantee provided by HDFS that
>>> does not hold on other implementations perhaps? A race condition of sorts
>>>
>>> Basically does anyone recognize this behavior?
>>>
>>> Regards,
>>>
>>> Richard Deurwaarder
>>>
>>> [1] We use an HDFS implementation provided by Google
>>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>>> [2]
>>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>>> [3]
>>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
>>> fs.gs.outputstream.upload.chunk.size)
>>> [4] Stacktrace:
>>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>>
>>

Re: Does flink support retries on checkpoint write failures

Posted by Richard Deurwaarder <ri...@xeli.eu>.
Hi Till,

I'll see if we can ask google to comment on those issues, perhaps they have
a fix in the works that would solve the root problem.
In the meanwhile
`CheckpointConfig.setTolerableCheckpointFailureNumber` sounds very
promising!
Thank you for this. I'm going to try this tomorrow to see if that helps. I
will let you know!

Richard

On Wed, Jan 29, 2020 at 3:47 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Richard,
>
> googling a bit indicates that this might actually be a GCS problem [1, 2,
> 3]. The proposed solution/workaround so far is to retry the whole upload
> operation as part of the application logic. Since I assume that you are
> writing to GCS via Hadoop's file system this should actually fall into the
> realm of the Hadoop file system implementation and not Flink.
>
> What you could do to mitigate the problem a bit is to set the number of
> tolerable checkpoint failures to a non-zero value via
> `CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
> means that the job will only fail and then restart after `n` checkpoint
> failures. Unfortunately, we do not support a failure rate yet.
>
> [1] https://github.com/googleapis/google-cloud-java/issues/3586
> [2] https://github.com/googleapis/google-cloud-java/issues/5704
> [3] https://issuetracker.google.com/issues/137168102
>
> Cheers,
> Till
>
> On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu>
> wrote:
>
>> Hi all,
>>
>> We've got a Flink job running on 1.8.0 which writes its state (rocksdb)
>> to Google Cloud Storage[1]. We've noticed that jobs with a large amount of
>> state (500gb range) are becoming *very* unstable. In the order of
>> restarting once an hour or even more.
>>
>> The reason for this instability is that we run into "410 Gone"[4] errors
>> from Google Cloud Storage. This indicates an upload (write from Flink's
>> perspective) took place and it wanted to resume the write[2] but could not
>> find the file which it needed to resume. My guess is this is because the
>> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>>
>> The library logs this line when this happens:
>>
>> "Encountered status code 410 when accessing URL
>> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
>> Delegating to response handler for possible retry."
>>
>> We're kind of stuck on these questions:
>> * Is flink capable or doing these retries?
>> * Does anyone succesfully write their (rocksdb) state to Google Cloud
>> storage for bigger state sizes?
>> * Is it possible flink renames or deletes certain directories before all
>> flushes have been done based on an atomic guarantee provided by HDFS that
>> does not hold on other implementations perhaps? A race condition of sorts
>>
>> Basically does anyone recognize this behavior?
>>
>> Regards,
>>
>> Richard Deurwaarder
>>
>> [1] We use an HDFS implementation provided by Google
>> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
>> [2]
>> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
>> [3]
>> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
>> fs.gs.outputstream.upload.chunk.size)
>> [4] Stacktrace:
>> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>>
>

Re: Does flink support retries on checkpoint write failures

Posted by Till Rohrmann <tr...@apache.org>.
Hi Richard,

googling a bit indicates that this might actually be a GCS problem [1, 2,
3]. The proposed solution/workaround so far is to retry the whole upload
operation as part of the application logic. Since I assume that you are
writing to GCS via Hadoop's file system this should actually fall into the
realm of the Hadoop file system implementation and not Flink.

What you could do to mitigate the problem a bit is to set the number of
tolerable checkpoint failures to a non-zero value via
`CheckpointConfig.setTolerableCheckpointFailureNumber`. Setting this to `n`
means that the job will only fail and then restart after `n` checkpoint
failures. Unfortunately, we do not support a failure rate yet.

[1] https://github.com/googleapis/google-cloud-java/issues/3586
[2] https://github.com/googleapis/google-cloud-java/issues/5704
[3] https://issuetracker.google.com/issues/137168102

Cheers,
Till

On Tue, Jan 28, 2020 at 6:25 PM Richard Deurwaarder <ri...@xeli.eu> wrote:

> Hi all,
>
> We've got a Flink job running on 1.8.0 which writes its state (rocksdb) to
> Google Cloud Storage[1]. We've noticed that jobs with a large amount of
> state (500gb range) are becoming *very* unstable. In the order of
> restarting once an hour or even more.
>
> The reason for this instability is that we run into "410 Gone"[4] errors
> from Google Cloud Storage. This indicates an upload (write from Flink's
> perspective) took place and it wanted to resume the write[2] but could not
> find the file which it needed to resume. My guess is this is because the
> previous attempt either failed or perhaps it uploads in chunks of 67mb [3].
>
> The library logs this line when this happens:
>
> "Encountered status code 410 when accessing URL
> https://www.googleapis.com/upload/storage/v1/b/<project>/o?ifGenerationMatch=0&name=job-manager/15aa2391-a055-4bfd-8d82-e9e4806baa9c/8ae818761055cdc022822010a8b4a1ed/chk-52224/_metadata&uploadType=resumable&upload_id=AEnB2UqJwkdrQ8YuzqrTp9Nk4bDnzbuJcTlD5E5hKNLNz4xQ7vjlYrDzYC29ImHcp0o6OjSCmQo6xkDSj5OHly7aChH0JxxXcg.
> Delegating to response handler for possible retry."
>
> We're kind of stuck on these questions:
> * Is flink capable or doing these retries?
> * Does anyone succesfully write their (rocksdb) state to Google Cloud
> storage for bigger state sizes?
> * Is it possible flink renames or deletes certain directories before all
> flushes have been done based on an atomic guarantee provided by HDFS that
> does not hold on other implementations perhaps? A race condition of sorts
>
> Basically does anyone recognize this behavior?
>
> Regards,
>
> Richard Deurwaarder
>
> [1] We use an HDFS implementation provided by Google
> https://github.com/GoogleCloudDataproc/bigdata-interop/tree/master/gcs
> [2]
> https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
> [3]
> https://github.com/GoogleCloudDataproc/bigdata-interop/blob/master/gcs/CONFIGURATION.md (see
> fs.gs.outputstream.upload.chunk.size)
> [4] Stacktrace:
> https://gist.github.com/Xeli/da4c0af2c49c060139ad01945488e492
>