You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sudharsan R <su...@gmail.com> on 2022/06/18 16:51:27 UTC

Savepoint (with job cancel) while checkpoint in progress

Hello,
We are running a single job in a flink 1.11.1 cluster on a k8s cluster. We
use zookeeper HA mode.

To upgrade our application code, we do a flink cli job cancel with
savepoint. We then bring down the whole flink cluster. We bring it back up
and submit the new app code with this savepoint.

Here's a specific scenario:
1. A checkpoint was initiated by the flink infra.
2. We triggered a cancel with savepoint while the checkpoint was in
progress.
3. Based on logs, the checkpoint completes and immediately after this the
savepoint also seems to complete. At this point, my expectation is that
zookeeper would have no state for this job on this cluster.
4. The new cluster comes up. We submit a job from our savepoint. However,
the old job also seems to have been recovered! The UI shows this job. The
logs also seem to indicate this.
Please see a list of interesting events:

21:09:28 Starting job 2ddc7c290891ec2d169068d1992586d4 from savepoint …….

Jun 17, 2022 @ 21:09:25.036 Submitting Job with
JobId=2ddc7c290891ec2d169068d1992586d4.

21:08:27 Recovered JobGraph(jobId: 28e0ef806b40c27111614081e18d72f9)

21:08:27 Successfully recovered 1 persisted job graphs.

21:07:27 Starting standalonesession dameon on ….

21:07:25 New jobmanager pod comes up


21:07:14 Last message seen from old manager job

21:07:00 Cancelling tasks to cancelled messages

21:06:42 savepoint stored in ….

21:05:16 Last message of type Received last message for now expired
checkpoint attempt 101289

21:04:52 Received late message for now expired checkpoint attempt 101289 ….

21:04:49 Triggering checkpoint 101290 (type=SAVEPOINT)

21:04:48: ERROR
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy:
Could not properly discard states.

21:04:48 ERROR
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory: Could
not delete the checkpoint stream file

21:04:47 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.

21:04:37 Triggering checkpoint 101289 (type=CHECKPOINT)


I don't see any zookeeper errors around this time(server or flink logs).
The ERROR events(21:04:48) are interesting. Although, it's much before the
savepoint completion (21:06:42).


What if anything could i be possibly doing wrong? We could try to clean out
the zookeeper state prior to job submission as a safety measure. But, i
would have expected this to work neverthless.


Thanks

Sudharsan

Re: Savepoint (with job cancel) while checkpoint in progress

Posted by yu'an huang <h....@gmail.com>.
The document also suggest not cancelling a job with savepoint. Can you try to execute “flink stop -s [savepoint dir] <jobid>” and then execute “flink cancel <jobid>”? You can send us the execution logs for above two commands.  


> On 19 Jun 2022, at 10:13 PM, Sudharsan R <su...@gmail.com> wrote:
> 
> Hi Yu'an,
> We use flink 1.11.1. This version has a 'cancel' option in the CLI (https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/cli.html <https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/cli.html>)
> So, we do flink cancel -s <savepoint location> <jobId>. We have had innumerable 'job cancels'  during deployments and we have never seen anything like the sequence above. So, it's very odd.
> 
> Thanks
> Sudharsan
> 
> 
> On Sun, Jun 19, 2022 at 2:22 AM yu'an huang <h.yuan667@gmail.com <ma...@gmail.com>> wrote:
> Hi Sudharsan, 
> 
> How did you cancel thus single job. According to the High Availability Document: 
> 
> “In order to recover submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted."
> 
> So I think the job data should be deleted if you use the action “cancel” (instead of “stop") to cancel the job. Also I paste the HA and savepoint doc link below, hopes these may help you.
> HA: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/ <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/>
> Savepoint: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/ <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/>
> 
> 
> Best,
> Yuan
> 
> 
> 
>> On 19 Jun 2022, at 12:51 AM, Sudharsan R <sud.rang@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hello,
>> We are running a single job in a flink 1.11.1 cluster on a k8s cluster. We use zookeeper HA mode.
>> 
>> To upgrade our application code, we do a flink cli job cancel with savepoint. We then bring down the whole flink cluster. We bring it back up and submit the new app code with this savepoint.
>> 
>> Here's a specific scenario:
>> 1. A checkpoint was initiated by the flink infra.
>> 2. We triggered a cancel with savepoint while the checkpoint was in progress.
>> 3. Based on logs, the checkpoint completes and immediately after this the savepoint also seems to complete. At this point, my expectation is that zookeeper would have no state for this job on this cluster.
>> 4. The new cluster comes up. We submit a job from our savepoint. However, the old job also seems to have been recovered! The UI shows this job. The logs also seem to indicate this. 
>> Please see a list of interesting events:
>> 21:09:28 Starting job 2ddc7c290891ec2d169068d1992586d4 from savepoint …….
>> Jun 17, 2022 @ 21:09:25.036 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.
>> 21:08:27 Recovered JobGraph(jobId: 28e0ef806b40c27111614081e18d72f9)
>> 21:08:27 Successfully recovered 1 persisted job graphs.
>> 21:07:27 Starting standalonesession dameon on ….
>> 21:07:25 New jobmanager pod comes up
>> 
>> 21:07:14 Last message seen from old manager job
>> 21:07:00 Cancelling tasks to cancelled messages
>> 21:06:42 savepoint stored in ….
>> 21:05:16 Last message of type Received last message for now expired checkpoint attempt 101289
>> 21:04:52 Received late message for now expired checkpoint attempt 101289 ….
>> 21:04:49 Triggering checkpoint 101290 (type=SAVEPOINT)
>> 21:04:48: ERROR org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy: Could not properly discard states.
>> 21:04:48 ERROR org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory: Could not delete the checkpoint stream file 
>> 21:04:47 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.
>> 21:04:37 Triggering checkpoint 101289 (type=CHECKPOINT)
>> 
>> I don't see any zookeeper errors around this time(server or flink logs). The ERROR events(21:04:48) are interesting. Although, it's much before the savepoint completion (21:06:42).
>> 
>> What if anything could i be possibly doing wrong? We could try to clean out the zookeeper state prior to job submission as a safety measure. But, i would have expected this to work neverthless.
>> 
>> Thanks
>> Sudharsan
>> 
> 


Re: Savepoint (with job cancel) while checkpoint in progress

Posted by Sudharsan R <su...@gmail.com>.
Hi Yu'an,
We use flink 1.11.1. This version has a 'cancel' option in the CLI (
https://nightlies.apache.org/flink/flink-docs-release-1.11/ops/cli.html)
So, we do flink cancel -s <savepoint location> <jobId>. We have had
innumerable 'job cancels'  during deployments and we have never seen
anything like the sequence above. So, it's very odd.

Thanks
Sudharsan


On Sun, Jun 19, 2022 at 2:22 AM yu'an huang <h....@gmail.com> wrote:

> Hi Sudharsan,
>
> How did you cancel thus single job. According to the High Availability
> Document:
>
> “In order to recover submitted jobs, Flink persists metadata and the job
> artifacts. The HA data will be kept until the respective job either
> succeeds, is cancelled or fails terminally. Once this happens, all the HA
> data, including the metadata stored in the HA services, will be deleted."
>
> So I think the job data should be deleted if you use the action “cancel”
> (instead of “stop") to cancel the job. Also I paste the HA and savepoint
> doc link below, hopes these may help you.
> HA:
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/
> Savepoint:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/
>
>
> Best,
> Yuan
>
>
>
> On 19 Jun 2022, at 12:51 AM, Sudharsan R <su...@gmail.com> wrote:
>
> Hello,
> We are running a single job in a flink 1.11.1 cluster on a k8s cluster. We
> use zookeeper HA mode.
>
> To upgrade our application code, we do a flink cli job cancel with
> savepoint. We then bring down the whole flink cluster. We bring it back up
> and submit the new app code with this savepoint.
>
> Here's a specific scenario:
> 1. A checkpoint was initiated by the flink infra.
> 2. We triggered a cancel with savepoint while the checkpoint was in
> progress.
> 3. Based on logs, the checkpoint completes and immediately after this the
> savepoint also seems to complete. At this point, my expectation is that
> zookeeper would have no state for this job on this cluster.
> 4. The new cluster comes up. We submit a job from our savepoint. However,
> the old job also seems to have been recovered! The UI shows this job. The
> logs also seem to indicate this.
> Please see a list of interesting events:
> 21:09:28 Starting job 2ddc7c290891ec2d169068d1992586d4 from savepoint …….
> Jun 17, 2022 @ 21:09:25.036 Submitting Job with
> JobId=2ddc7c290891ec2d169068d1992586d4.
> 21:08:27 Recovered JobGraph(jobId: 28e0ef806b40c27111614081e18d72f9)
> 21:08:27 Successfully recovered 1 persisted job graphs.
> 21:07:27 Starting standalonesession dameon on ….
> 21:07:25 New jobmanager pod comes up
>
> 21:07:14 Last message seen from old manager job
> 21:07:00 Cancelling tasks to cancelled messages
> 21:06:42 savepoint stored in ….
> 21:05:16 Last message of type Received last message for now expired
> checkpoint attempt 101289
> 21:04:52 Received late message for now expired checkpoint attempt 101289 ….
> 21:04:49 Triggering checkpoint 101290 (type=SAVEPOINT)
> 21:04:48: ERROR
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy:
> Could not properly discard states.
> 21:04:48 ERROR
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory: Could
> not delete the checkpoint stream file
> 21:04:47 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.
>
> 21:04:37 Triggering checkpoint 101289 (type=CHECKPOINT)
>
> I don't see any zookeeper errors around this time(server or flink logs).
> The ERROR events(21:04:48) are interesting. Although, it's much before the
> savepoint completion (21:06:42).
>
> What if anything could i be possibly doing wrong? We could try to clean
> out the zookeeper state prior to job submission as a safety measure. But, i
> would have expected this to work neverthless.
>
> Thanks
> Sudharsan
>
>
>

Re: Savepoint (with job cancel) while checkpoint in progress

Posted by yu'an huang <h....@gmail.com>.
Hi Sudharsan, 

How did you cancel thus single job. According to the High Availability Document: 

“In order to recover submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted."

So I think the job data should be deleted if you use the action “cancel” (instead of “stop") to cancel the job. Also I paste the HA and savepoint doc link below, hopes these may help you.
HA: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/overview/
Savepoint: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/


Best,
Yuan



> On 19 Jun 2022, at 12:51 AM, Sudharsan R <su...@gmail.com> wrote:
> 
> Hello,
> We are running a single job in a flink 1.11.1 cluster on a k8s cluster. We use zookeeper HA mode.
> 
> To upgrade our application code, we do a flink cli job cancel with savepoint. We then bring down the whole flink cluster. We bring it back up and submit the new app code with this savepoint.
> 
> Here's a specific scenario:
> 1. A checkpoint was initiated by the flink infra.
> 2. We triggered a cancel with savepoint while the checkpoint was in progress.
> 3. Based on logs, the checkpoint completes and immediately after this the savepoint also seems to complete. At this point, my expectation is that zookeeper would have no state for this job on this cluster.
> 4. The new cluster comes up. We submit a job from our savepoint. However, the old job also seems to have been recovered! The UI shows this job. The logs also seem to indicate this. 
> Please see a list of interesting events:
> 21:09:28 Starting job 2ddc7c290891ec2d169068d1992586d4 from savepoint …….
> Jun 17, 2022 @ 21:09:25.036 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.
> 21:08:27 Recovered JobGraph(jobId: 28e0ef806b40c27111614081e18d72f9)
> 21:08:27 Successfully recovered 1 persisted job graphs.
> 21:07:27 Starting standalonesession dameon on ….
> 21:07:25 New jobmanager pod comes up
> 
> 21:07:14 Last message seen from old manager job
> 21:07:00 Cancelling tasks to cancelled messages
> 21:06:42 savepoint stored in ….
> 21:05:16 Last message of type Received last message for now expired checkpoint attempt 101289
> 21:04:52 Received late message for now expired checkpoint attempt 101289 ….
> 21:04:49 Triggering checkpoint 101290 (type=SAVEPOINT)
> 21:04:48: ERROR org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy: Could not properly discard states.
> 21:04:48 ERROR org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory: Could not delete the checkpoint stream file 
> 21:04:47 Submitting Job with JobId=2ddc7c290891ec2d169068d1992586d4.
> 21:04:37 Triggering checkpoint 101289 (type=CHECKPOINT)
> 
> I don't see any zookeeper errors around this time(server or flink logs). The ERROR events(21:04:48) are interesting. Although, it's much before the savepoint completion (21:06:42).
> 
> What if anything could i be possibly doing wrong? We could try to clean out the zookeeper state prior to job submission as a safety measure. But, i would have expected this to work neverthless.
> 
> Thanks
> Sudharsan
>