You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Cristian <kn...@fastmail.fm> on 2020/09/05 00:11:06 UTC

Checkpoint metadata deleted by Flink after ZK connection issues

Hello guys.

We run a stand-alone cluster that runs a single job (if you are familiar with the way Ververica Platform runs Flink jobs, we use a very similar approach). It runs Flink 1.11.1 straight from the official docker image.

Usually, when our jobs crash for any reason, they will resume from the latest checkpoint. This is the expected behavior and has been working fine for years.

But we encountered an issue with a job that crashed apparently because it lost connectivity with Zookeeper.

The logs for this job can be found here: https://pastebin.com/raw/uH9KDU2L (I redacted boring or private stuff and annotated the relevant parts).

From what I can tell, this line was called:

```
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
	applicationStatus,
	null,
	true);
```
https://github.com/apache/flink/blob/6b9cdd41743edd24a929074d62a57b84e7b2dd97/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L243-L246

which seems pretty dangerous because it ends up calling

HighAvailabilityServices.closeAndCleanupAllData()
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L225-L239

To me this looks like a dangerous default... why would we want to delete the checkpoint metadata ever unless when explicitly canceling/stopping the job?

I think that if/else branch means something like: if the job crashed (i.e. `throwable != null`), then DO NOT wipe out the state. Otherwise, delete it. But in this case... it seems like `throwable` was indeed null, which caused the job to delete the checkpoint data before dying.

At this point, I'm just guessing really... I don't really know if this is what happened in this case. Hopefully someone with more kwoledge of how this works give us a hand.

Thanks.

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Till Rohrmann <tr...@apache.org>.
Great, thanks Klou!

Cheers,
Till

On Mon, Sep 28, 2020 at 5:07 PM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi all,
>
> I will have a look.
>
> Kostas
>
> On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > Hi Cristian,
> >
> > thanks for reporting this issue. It looks indeed like a very critical
> problem.
> >
> > The problem seems to be that the ApplicationDispatcherBootstrap class
> produces an exception (that the request job can no longer be found because
> of a lost ZooKeeper connection) which will be interpreted as a job failure.
> Due to this interpretation, the cluster will be shut down with a terminal
> state of FAILED which will cause the HA data to be cleaned up. The exact
> problem occurs in the JobStatusPollingUtils.getJobResult which is called by
> ApplicationDispatcherBootstrap.getJobResult().
> >
> > I think there are two problems here: First of all not every exception
> bubbling up in the future returned by
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
> job failure. Some of them can also indicate a framework failure which
> should not lead to the clean up of HA data. The other problem is that the
> polling logic cannot properly handle a temporary connection loss to
> ZooKeeper which is a normal situation.
> >
> > I am pulling in Aljoscha and Klou who worked on this feature and might
> be able to propose a solution for these problems. I've also updated the
> JIRA issue FLINK-19154.
> >
> > Cheers,
> > Till
> >
> > On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <da...@gmail.com> wrote:
> >>
> >> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >> Since we could submit multiple jobs into a Flink session, what i mean
> is when a job
> >> reached to the terminal state, the sub node(e.g.
> /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> >> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_xxxx/) still exists.
> >>
> >>
> >> For your current case, it is a different case(perjob cluster). I think
> we need to figure out why the only
> >> running job reached the terminal state. For example, the restart
> attempts are exhausted. And you
> >> could find the following logs in your JobManager log.
> >>
> >> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
> >>
> >>
> >> Best,
> >> Yang
> >>
> >>
> >>
> >>
> >> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午11:26写道:
> >>>
> >>> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> >>>
> >>> What does this mean?
> >>>
> >>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
> >>>
> >>> The only cases where I expect Flink to clean up the checkpoint data
> from ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
> >>>
> >>> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
> >>>
> >>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
> >>>
> >>> AFAIK, the HA data, including Zookeeper meta data and real data on
> DFS, will only be cleaned up
> >>> when the Flink cluster reached terminated state.
> >>>
> >>> So if you are using a session cluster, the root cluster node on Zk
> will be cleaned up after you manually
> >>> stop the session cluster. The job sub directory will be cleaned up
> when the job finished/canceled/failed.
> >>>
> >>> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> >>> be cleaned up. I think you need to check the job restart strategy you
> have set. For example, the following
> >>> configuration will make the Flink cluster terminated after 10 attempts.
> >>>
> >>> restart-strategy: fixed-delay
> >>> restart-strategy.fixed-delay.attempts: 10
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
> >>>
> >>>
> >>> I'm using the standalone script to start the cluster.
> >>>
> >>> As far as I can tell, it's not easy to reproduce. We found that
> zookeeper lost a node around the time this happened, but all of our other
> 75 Flink jobs which use the same setup, version and zookeeper, didn't have
> any issues. They didn't even restart.
> >>>
> >>> So unfortunately I don't know how to reproduce this. All I know is I
> can't sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
> >>>
> >>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
> >>>
> >>> Thanks a lot for reporting this problem here Cristian!
> >>>
> >>> I am not super familiar with the involved components, but the behavior
> you are describing doesn't sound right to me.
> >>> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting
> StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
> >>>
> >>> Do you know by chance if this problem is reproducible? With the
> StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
> >>>
> >>> Hi Cristian,
> >>>
> >>>
> >>> I don't know if it was designed to be like this deliberately.
> >>>
> >>> So I have already submitted an issue ,and wait for somebody to
> response.
> >>>
> >>> https://issues.apache.org/jira/browse/FLINK-19154
> >>>
> >>>
> >>>
> >>> --
> >>> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >>>
> >>>
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi all,

I will have a look.

Kostas

On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Cristian,
>
> thanks for reporting this issue. It looks indeed like a very critical problem.
>
> The problem seems to be that the ApplicationDispatcherBootstrap class produces an exception (that the request job can no longer be found because of a lost ZooKeeper connection) which will be interpreted as a job failure. Due to this interpretation, the cluster will be shut down with a terminal state of FAILED which will cause the HA data to be cleaned up. The exact problem occurs in the JobStatusPollingUtils.getJobResult which is called by ApplicationDispatcherBootstrap.getJobResult().
>
> I think there are two problems here: First of all not every exception bubbling up in the future returned by ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a job failure. Some of them can also indicate a framework failure which should not lead to the clean up of HA data. The other problem is that the polling logic cannot properly handle a temporary connection loss to ZooKeeper which is a normal situation.
>
> I am pulling in Aljoscha and Klou who worked on this feature and might be able to propose a solution for these problems. I've also updated the JIRA issue FLINK-19154.
>
> Cheers,
> Till
>
> On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <da...@gmail.com> wrote:
>>
>> > The job sub directory will be cleaned up when the job finished/canceled/failed.
>> Since we could submit multiple jobs into a Flink session, what i mean is when a job
>> reached to the terminal state, the sub node(e.g. /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1)
>> on the Zookeeper will be cleaned up. But the root directory(/flink/application_xxxx/) still exists.
>>
>>
>> For your current case, it is a different case(perjob cluster). I think we need to figure out why the only
>> running job reached the terminal state. For example, the restart attempts are exhausted. And you
>> could find the following logs in your JobManager log.
>>
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"
>>
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午11:26写道:
>>>
>>> > The job sub directory will be cleaned up when the job finished/canceled/failed.
>>>
>>> What does this mean?
>>>
>>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>>
>>> The only cases where I expect Flink to clean up the checkpoint data from ZK is when I explicitly stop or cancel the job (in those cases the job manager takes a savepoint before cleaning up zk and finishing the cluster).
>>>
>>> Which is not the case here. Flink was on autopilot here and decided to wipe my poor, good checkpoint metadata as the logs show.
>>>
>>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>>
>>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, will only be cleaned up
>>> when the Flink cluster reached terminated state.
>>>
>>> So if you are using a session cluster, the root cluster node on Zk will be cleaned up after you manually
>>> stop the session cluster. The job sub directory will be cleaned up when the job finished/canceled/failed.
>>>
>>> If you are using a job/application cluster, once the only running job finished/failed, all the HA data will
>>> be cleaned up. I think you need to check the job restart strategy you have set. For example, the following
>>> configuration will make the Flink cluster terminated after 10 attempts.
>>>
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
>>>
>>>
>>> I'm using the standalone script to start the cluster.
>>>
>>> As far as I can tell, it's not easy to reproduce. We found that zookeeper lost a node around the time this happened, but all of our other 75 Flink jobs which use the same setup, version and zookeeper, didn't have any issues. They didn't even restart.
>>>
>>> So unfortunately I don't know how to reproduce this. All I know is I can't sleep. I have nightmares were my precious state is deleted. I wake up crying and quickly start manually savepointing all jobs just in case, because I feel the day of reckon is near. Flinkpocalypse!
>>>
>>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>>
>>> Thanks a lot for reporting this problem here Cristian!
>>>
>>> I am not super familiar with the involved components, but the behavior you are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: "2020-09-08 14:45:32,807 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>>
>>> Do you know by chance if this problem is reproducible? With the StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
>>>
>>>
>>>
>>>
>>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>>>
>>> Hi Cristian,
>>>
>>>
>>> I don't know if it was designed to be like this deliberately.
>>>
>>> So I have already submitted an issue ,and wait for somebody to response.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-19154
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Till Rohrmann <tr...@apache.org>.
Hi Cristian,

thanks for reporting this issue. It looks indeed like a very critical
problem.

The problem seems to be that the ApplicationDispatcherBootstrap class
produces an exception (that the request job can no longer be found because
of a lost ZooKeeper connection) which will be interpreted as a job failure.
Due to this interpretation, the cluster will be shut down with a terminal
state of FAILED which will cause the HA data to be cleaned up. The exact
problem occurs in the JobStatusPollingUtils.getJobResult which is called by
ApplicationDispatcherBootstrap.getJobResult().

I think there are two problems here: First of all not every exception
bubbling up in the future returned by
ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
job failure. Some of them can also indicate a framework failure which
should not lead to the clean up of HA data. The other problem is that the
polling logic cannot properly handle a temporary connection loss to
ZooKeeper which is a normal situation.

I am pulling in Aljoscha and Klou who worked on this feature and might be
able to propose a solution for these problems. I've also updated the JIRA
issue FLINK-19154.

Cheers,
Till

On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <da...@gmail.com> wrote:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> Since we could submit multiple jobs into a Flink session, what i mean is
> when a job
> reached to the terminal state, the sub node(e.g.
> /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_xxxx/) still exists.
>
>
> For your current case, it is a different case(perjob cluster). I think we
> need to figure out why the only
> running job reached the terminal state. For example, the restart attempts
> are exhausted. And you
> could find the following logs in your JobManager log.
>
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
>
>
> Best,
> Yang
>
>
>
>
> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午11:26写道:
>
>> > The job sub directory will be cleaned up when the job
>> finished/canceled/failed.
>>
>> What does this mean?
>>
>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>
>> The only cases where I expect Flink to clean up the checkpoint data from
>> ZK is when I explicitly stop or cancel the job (in those cases the job
>> manager takes a savepoint before cleaning up zk and finishing the cluster).
>>
>> Which is not the case here. Flink was on autopilot here and decided to
>> wipe my poor, good checkpoint metadata as the logs show.
>>
>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>
>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
>> will only be cleaned up
>> when the Flink cluster reached terminated state.
>>
>> So if you are using a session cluster, the root cluster node on Zk will
>> be cleaned up after you manually
>> stop the session cluster. The job sub directory will be cleaned up when
>> the job finished/canceled/failed.
>>
>> If you are using a job/application cluster, once the only running job
>> finished/failed, all the HA data will
>> be cleaned up. I think you need to check the job restart strategy you
>> have set. For example, the following
>> configuration will make the Flink cluster terminated after 10 attempts.
>>
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>>
>>
>> Best,
>> Yang
>>
>> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
>>
>>
>> I'm using the standalone script to start the cluster.
>>
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper
>> lost a node around the time this happened, but all of our other 75 Flink
>> jobs which use the same setup, version and zookeeper, didn't have any
>> issues. They didn't even restart.
>>
>> So unfortunately I don't know how to reproduce this. All I know is I
>> can't sleep. I have nightmares were my precious state is deleted. I wake up
>> crying and quickly start manually savepointing all jobs just in case,
>> because I feel the day of reckon is near. Flinkpocalypse!
>>
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>
>> Thanks a lot for reporting this problem here Cristian!
>>
>> I am not super familiar with the involved components, but the behavior
>> you are describing doesn't sound right to me.
>> Which entrypoint are you using? This is logged at the beginning, like
>> this: "2020-09-08 14:45:32,807 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
>>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
>> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>
>> Do you know by chance if this problem is reproducible? With
>> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
>> problem.
>>
>>
>>
>>
>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>>
>> Hi Cristian,
>>
>>
>> I don't know if it was designed to be like this deliberately.
>>
>> So I have already submitted an issue ,and wait for somebody to response.
>>
>> https://issues.apache.org/jira/browse/FLINK-19154
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Yang Wang <da...@gmail.com>.
> The job sub directory will be cleaned up when the job
finished/canceled/failed.
Since we could submit multiple jobs into a Flink session, what i mean is
when a job
reached to the terminal state, the sub node(e.g.
/flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1)
on the Zookeeper will be cleaned up. But the root
directory(/flink/application_xxxx/) still exists.


For your current case, it is a different case(perjob cluster). I think we
need to figure out why the only
running job reached the terminal state. For example, the restart attempts
are exhausted. And you
could find the following logs in your JobManager log.

"org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy"


Best,
Yang




Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午11:26写道:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
>
> What does this mean?
>
> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>
> The only cases where I expect Flink to clean up the checkpoint data from
> ZK is when I explicitly stop or cancel the job (in those cases the job
> manager takes a savepoint before cleaning up zk and finishing the cluster).
>
> Which is not the case here. Flink was on autopilot here and decided to
> wipe my poor, good checkpoint metadata as the logs show.
>
> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>
> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
> will only be cleaned up
> when the Flink cluster reached terminated state.
>
> So if you are using a session cluster, the root cluster node on Zk will be
> cleaned up after you manually
> stop the session cluster. The job sub directory will be cleaned up when
> the job finished/canceled/failed.
>
> If you are using a job/application cluster, once the only running job
> finished/failed, all the HA data will
> be cleaned up. I think you need to check the job restart strategy you have
> set. For example, the following
> configuration will make the Flink cluster terminated after 10 attempts.
>
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
>
>
> Best,
> Yang
>
> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
>
>
> I'm using the standalone script to start the cluster.
>
> As far as I can tell, it's not easy to reproduce. We found that zookeeper
> lost a node around the time this happened, but all of our other 75 Flink
> jobs which use the same setup, version and zookeeper, didn't have any
> issues. They didn't even restart.
>
> So unfortunately I don't know how to reproduce this. All I know is I can't
> sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
>
> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>
> Thanks a lot for reporting this problem here Cristian!
>
> I am not super familiar with the involved components, but the behavior you
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>
> Do you know by chance if this problem is reproducible? With
> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
> problem.
>
>
>
>
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>
> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Cristian <kn...@fastmail.fm>.
> The job sub directory will be cleaned up when the job finished/canceled/failed.

What does this mean? 

Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the time... and yet, the jobs would ALWAYS resume from the last checkpoint. 

The only cases where I expect Flink to clean up the checkpoint data from ZK is when I explicitly stop or cancel the job (in those cases the job manager takes a savepoint before cleaning up zk and finishing the cluster). 

Which is not the case here. Flink was on autopilot here and decided to wipe my poor, good checkpoint metadata as the logs show. 

On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, will only be cleaned up
> when the Flink cluster reached terminated state.
> 
> So if you are using a session cluster, the root cluster node on Zk will be cleaned up after you manually
> stop the session cluster. The job sub directory will be cleaned up when the job finished/canceled/failed.
> 
> If you are using a job/application cluster, once the only running job finished/failed, all the HA data will
> be cleaned up. I think you need to check the job restart strategy you have set. For example, the following
> configuration will make the Flink cluster terminated after 10 attempts.
> 
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> 
> 
> Best,
> Yang
> 
> Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
>> __
>> I'm using the standalone script to start the cluster. 
>> 
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper lost a node around the time this happened, but all of our other 75 Flink jobs which use the same setup, version and zookeeper, didn't have any issues. They didn't even restart. 
>> 
>> So unfortunately I don't know how to reproduce this. All I know is I can't sleep. I have nightmares were my precious state is deleted. I wake up crying and quickly start manually savepointing all jobs just in case, because I feel the day of reckon is near. Flinkpocalypse!
>> 
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>> Thanks a lot for reporting this problem here Cristian!
>>> 
>>> I am not super familiar with the involved components, but the behavior you are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: "2020-09-08 14:45:32,807 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>> 
>>> Do you know by chance if this problem is reproducible? With the StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>>>> Hi Cristian,
>>>> 
>>>> 
>>>> I don't know if it was designed to be like this deliberately.
>>>> 
>>>> So I have already submitted an issue ,and wait for somebody to response.
>>>> 
>>>> https://issues.apache.org/jira/browse/FLINK-19154   
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Yang Wang <da...@gmail.com>.
AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
will only be cleaned up
when the Flink cluster reached terminated state.

So if you are using a session cluster, the root cluster node on Zk will be
cleaned up after you manually
stop the session cluster. The job sub directory will be cleaned up when the
job finished/canceled/failed.

If you are using a job/application cluster, once the only running job
finished/failed, all the HA data will
be cleaned up. I think you need to check the job restart strategy you have
set. For example, the following
configuration will make the Flink cluster terminated after 10 attempts.

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10


Best,
Yang

Cristian <kn...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:

> I'm using the standalone script to start the cluster.
>
> As far as I can tell, it's not easy to reproduce. We found that zookeeper
> lost a node around the time this happened, but all of our other 75 Flink
> jobs which use the same setup, version and zookeeper, didn't have any
> issues. They didn't even restart.
>
> So unfortunately I don't know how to reproduce this. All I know is I can't
> sleep. I have nightmares were my precious state is deleted. I wake up
> crying and quickly start manually savepointing all jobs just in case,
> because I feel the day of reckon is near. Flinkpocalypse!
>
> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>
> Thanks a lot for reporting this problem here Cristian!
>
> I am not super familiar with the involved components, but the behavior you
> are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like
> this: "2020-09-08 14:45:32,807 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>
> Do you know by chance if this problem is reproducible? With
> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
> problem.
>
>
>
>
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>
> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Cristian <kn...@fastmail.fm>.
I'm using the standalone script to start the cluster. 

As far as I can tell, it's not easy to reproduce. We found that zookeeper lost a node around the time this happened, but all of our other 75 Flink jobs which use the same setup, version and zookeeper, didn't have any issues. They didn't even restart. 

So unfortunately I don't know how to reproduce this. All I know is I can't sleep. I have nightmares were my precious state is deleted. I wake up crying and quickly start manually savepointing all jobs just in case, because I feel the day of reckon is near. Flinkpocalypse!

On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
> Thanks a lot for reporting this problem here Cristian!
> 
> I am not super familiar with the involved components, but the behavior you are describing doesn't sound right to me.
> Which entrypoint are you using? This is logged at the beginning, like this: "2020-09-08 14:45:32,807 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
> 
> Do you know by chance if this problem is reproducible? With the StandaloneSessionClusterEntrypoint I was not able to reproduce the problem.
> 
> 
> 
> 
> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:
>> Hi Cristian,
>> 
>> 
>> I don't know if it was designed to be like this deliberately.
>> 
>> So I have already submitted an issue ,and wait for somebody to response.
>> 
>> https://issues.apache.org/jira/browse/FLINK-19154   
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Robert Metzger <rm...@apache.org>.
Thanks a lot for reporting this problem here Cristian!

I am not super familiar with the involved components, but the behavior you
are describing doesn't sound right to me.
Which entrypoint are you using? This is logged at the beginning, like this:
"2020-09-08 14:45:32,807 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
 Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"

Do you know by chance if this problem is reproducible? With
the StandaloneSessionClusterEntrypoint I was not able to reproduce the
problem.




On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <56...@qq.com> wrote:

> Hi Cristian,
>
>
> I don't know if it was designed to be like this deliberately.
>
> So I have already submitted an issue ,and wait for somebody to response.
>
> https://issues.apache.org/jira/browse/FLINK-19154
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Husky Zeng <56...@qq.com>.
Hi Cristian,


I don't know if it was designed to be like this deliberately.

So I have already submitted an issue ,and wait for somebody to response.

https://issues.apache.org/jira/browse/FLINK-19154   



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Cristian <kn...@fastmail.fm>.
That's an excellent question. I can't explain that. All I know is this:

- the job was upgraded and resumed from a savepoint 
- After hours of working fine, it failed (like it shows in the logs) 
- the Metadata was cleaned up, again as shown in the logs
- because I run this in Kubernetes, the container was restarted immediately, and because nothing was found in zookeeper it started again from the savepoint 

I didn't realize this was happening after a couple of hours later. At that point the job had already checkpointed several times, and it was futile to try to start it from a retained checkpoint (assuming there were any). 

My question is... Is this a bug or not? 

On Mon, Sep 7, 2020, at 1:53 AM, Husky Zeng wrote:
> I means that checkpoints are usually dropped after the job was terminated by
> the user (except if explicitly configured as retained Checkpoints).   You
> could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save
> your checkpoint when te cames to failure.
> 
> When your zookeeper lost connection,the High-Availability system ,which rely
> on zookeeper was also failure, it leads to your application stop without
> retry.  
> 
> I hava a question ,  if your application lost zookeeper connection,how did
> it delete the data in zookeeper?
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Husky Zeng <56...@qq.com>.
I means that checkpoints are usually dropped after the job was terminated by
the user (except if explicitly configured as retained Checkpoints).   You
could use "ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION" to save
your checkpoint when te cames to failure.

When your zookeeper lost connection,the High-Availability system ,which rely
on zookeeper was also failure, it leads to your application stop without
retry.  

I hava a question ,  if your application lost zookeeper connection,how did
it delete the data in zookeeper?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Cristian <kn...@fastmail.fm>.
> If you want to save your checkPoint,you could  refer to this document

What do you mean? We already persist our savepoints, and we do not delete them explicitly ever. 

The problem is that Flink deleted the data from zookeeper when it shouldn't have. Is it possible to start a job from a checkpoint using - - fromSavepoint? 


On Sat, Sep 5, 2020, at 2:05 AM, Husky Zeng wrote:
> 
> Hi Cristian,
> 
> From  this code , we could see that the Exception or Error was ignored in
> dispatcher.shutDownCluster(applicationStatus) .
> 
> ``
> org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster
> 
> return applicationCompletionFuture
> 		.handle((r, t) -> {
> 			final ApplicationStatus applicationStatus;
> 			if (t != null) {
> 
> 				final Optional<JobCancellationException> cancellationException =
> 						ExceptionUtils.findThrowable(t, JobCancellationException.class);
> 
> 				if (cancellationException.isPresent()) {
> 					// this means the Flink Job was cancelled
> 					applicationStatus = ApplicationStatus.CANCELED;
> 				} else if (t instanceof CancellationException) {
> 					// this means that the future was cancelled
> 					applicationStatus = ApplicationStatus.UNKNOWN;
> 				} else {
> 					applicationStatus = ApplicationStatus.FAILED;
> 				}
> 
> 				LOG.warn("Application {}: ", applicationStatus, t);
> 			} else {
> 				applicationStatus = ApplicationStatus.SUCCEEDED;
> 				LOG.info("Application completed SUCCESSFULLY");
> 			}
> 			return dispatcher.shutDownCluster(applicationStatus);
> 		})
> 		.thenCompose(Function.identity());
> 
> ``
> 
> 
> So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
> there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
> up.
> 
> 
> ``
> 	clusterComponent.getShutDownFuture().whenComplete(
> 		(ApplicationStatus applicationStatus, Throwable throwable) -> {
> 			if (throwable != null) {
> 				shutDownAsync(
> 					ApplicationStatus.UNKNOWN,
> 					ExceptionUtils.stringifyException(throwable),
> 					false);
> 			} else {
> 				// This is the general shutdown path. If a separate more specific
> shutdown was
> 				// already triggered, this will do nothing
> 				shutDownAsync(
> 					applicationStatus,
> 					null,
> 					true);
> 			}
> 		});
> }
> 
> ``
> 
> If you want to save your checkPoint,you could  refer to this document:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html
> 
> In another way,you could change the code in 
>  org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
> it came to faied,save the data.
> 
> In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
> in any solution. Is there anyone could help me to solve this question?
> 
> Best,
> Husky Zeng
> 
> 
> 
> 
> 
> -----
> Chinese,NanJing , Huawei.
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Husky Zeng <56...@qq.com>.
Hi Cristian,

From  this code , we could see that the Exception or Error was ignored in
dispatcher.shutDownCluster(applicationStatus) .

``
org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster

return applicationCompletionFuture
		.handle((r, t) -> {
			final ApplicationStatus applicationStatus;
			if (t != null) {

				final Optional<JobCancellationException> cancellationException =
						ExceptionUtils.findThrowable(t, JobCancellationException.class);

				if (cancellationException.isPresent()) {
					// this means the Flink Job was cancelled
					applicationStatus = ApplicationStatus.CANCELED;
				} else if (t instanceof CancellationException) {
					// this means that the future was cancelled
					applicationStatus = ApplicationStatus.UNKNOWN;
				} else {
					applicationStatus = ApplicationStatus.FAILED;
				}

				LOG.warn("Application {}: ", applicationStatus, t);
			} else {
				applicationStatus = ApplicationStatus.SUCCEEDED;
				LOG.info("Application completed SUCCESSFULLY");
			}
			return dispatcher.shutDownCluster(applicationStatus);
		})
		.thenCompose(Function.identity());

``


So when it come to  java.util.concurrent.CompletableFuture#whenComplete  ,
there is no throwable, only ApplicationStatus.FAILED , and data was cleaned
up.


``
	clusterComponent.getShutDownFuture().whenComplete(
		(ApplicationStatus applicationStatus, Throwable throwable) -> {
			if (throwable != null) {
				shutDownAsync(
					ApplicationStatus.UNKNOWN,
					ExceptionUtils.stringifyException(throwable),
					false);
			} else {
				// This is the general shutdown path. If a separate more specific
shutdown was
				// already triggered, this will do nothing
				shutDownAsync(
					applicationStatus,
					null,
					true);
			}
		});
}

``

If you want to save your checkPoint,you could  refer to this document:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/checkpoints.html

In another way,you could change the code in 
 org.apache.flink.runtime.dispatcher.DispatcherGateway#shutDownCluster,when
it came to faied,save the data.

In fact, I'm wondering why it ignore the Throwable,default to delete Ha Data
in any solution. Is there anyone could help me to solve this question?

Best,
Husky Zeng





-----
Chinese,NanJing , Huawei.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Cristian <kn...@fastmail.fm>.
My suspicion is that somewhere in the path were it fails to connect yo zookeeper, the exception is swallowed, so instead of running the shutdown path for when the job fails, the general shutdown path is taken. 

This was fortunately a job for which we had a savepoint from yesterday. Otherwise we would have been in serios problems. 


On Fri, Sep 4, 2020, at 9:12 PM, Qingdong Zeng wrote:
> Hi Cristian,
> 
> In the log,we can see it went to the method
> shutDownAsync(applicationStatus,null,true);
> 					
> ``   
> 2020-09-04 17:32:07,950 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
> StandaloneApplicationClusterEntryPoint down with application status FAILED.
> Diagnostics null.
> ``   
> 
> In general shutdown path,default to clean up HaData is normal. So the
> problem is not why we clean up HaData in general shutdown path,but why it
> went to the general shutdown path when your cluster fails.
> 
> I am going to have lunch , and plan to  analyze the log in the afternoon.
> 
> Best,
> Qingdong Zeng
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Checkpoint metadata deleted by Flink after ZK connection issues

Posted by Qingdong Zeng <56...@qq.com>.
Hi Cristian,

In the log,we can see it went to the method
shutDownAsync(applicationStatus,null,true);
					
``   
2020-09-04 17:32:07,950 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting
StandaloneApplicationClusterEntryPoint down with application status FAILED.
Diagnostics null.
``   

In general shutdown path,default to clean up HaData is normal. So the
problem is not why we clean up HaData in general shutdown path,but why it
went to the general shutdown path when your cluster fails.

I am going to have lunch , and plan to  analyze the log in the afternoon.

Best,
Qingdong Zeng



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/