You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2020/08/20 04:34:16 UTC

Flink Job cluster in HA mode - recovery vs upgrade

Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey

Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
If you do did not specify a different job or cluster id, then yes it 
will read the graph from Zookeeper.
Differentiating different submissions is the very purpose of job ids.

On 23/08/2020 16:38, Alexey Trenikhun wrote:
> Let’s say HA is enabled, so this part works. Now we want to upgrade 
> job jar, we stop job with save point sp2, change manifest to specify 
> “-s sp2” and newer image, and create K8s job again, on start will 
> HAServices still read job graph from Zookeeper?
>
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Sunday, August 23, 2020 7:25:11 AM
> *To:* Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski 
> <pn...@apache.org>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
> If HA is enabled the the cluster will continue from the latest 
> externalized checkpoint.
> Without HA it still start from the savepoint.
>
> On 23/08/2020 16:18, Alexey Trenikhun wrote:
>>
>> Let’s say job cluster was submitted as job from save point sp1, so 
>> spec includes “-s sp1”, job run for days, takin externalized 
>> checkpoints every 5 minute, then suddenly pod failed, Kubernetes job 
>> controller restarts job pod using original job spec, which has “-s 
>> sp1”, so Flink job will start from sp1 rather than from latest 
>> externalized checkpoint. Is my understanding correct?
>>
>>
>> ------------------------------------------------------------------------
>> *From:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>> *Sent:* Sunday, August 23, 2020 1:46:45 AM
>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>> <ma...@flink.apache.org>
>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>
>> A job cluster is submitted as a job, not a deployment.
>>
>> The built-in Job controller of Kubernetes ensures that this job 
>> finishes successfully, and if required starts new pods.
>>
>>
>>
>> On 23/08/2020 06:43, Alexey Trenikhun wrote:
>>> Since it is necessary to use cancel with save point/resume from save 
>>> point, then it is not possible to use Deployment (otherwise 
>>> JobManager pod will restart on crash from same save point), so we 
>>> need to use Job, but in that case ifJob pod is crashed who will 
>>> start new instance of Job pod ? Sounds like currently HA with 
>>> kubernetes is not achievable unless some controller is used to 
>>> manage JobManager. Am I right?
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Chesnay Schepler <ch...@apache.org> 
>>> <ma...@apache.org>
>>> *Sent:* Saturday, August 22, 2020 12:58 AM
>>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>>> <ma...@flink.apache.org>
>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>> If, and only if, the cluster-id and JobId are identical then the 
>>> JobGraph will be recovered from ZooKeeper.
>>>
>>> On 22/08/2020 06:12, Alexey Trenikhun wrote:
>>>> Not sure I that I understand your statement about "the HaServices 
>>>> are only being given the JobGraph", seems 
>>>> HighAvailabilityServices#getJobGraphStore provides JobGraphStore, 
>>>> and potentially implementation of 
>>>> JobGraphStore#recoverJobGraph(JobID jobId) for this store could 
>>>> build new graph for jar rather than read stored graph from ZooKeeper?
>>>>
>>>> Also, if there is single job with same job-id (job cluster), 
>>>> jobgraph of failed job will be over written by new one which will 
>>>> have same job-id?
>>>>
>>>> ------------------------------------------------------------------------
>>>> *From:* Chesnay Schepler <ch...@apache.org> 
>>>> <ma...@apache.org>
>>>> *Sent:* Friday, August 21, 2020 12:16 PM
>>>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>>>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>>>> <ma...@flink.apache.org>
>>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>>> The HaServices are only being given the JobGraph, to this is not 
>>>> possible.
>>>>
>>>> Actually I have to correct myself. For a job cluster the state in 
>>>> HA should be irrelevant when you're submitting another jar.
>>>> Flink has no way of knowing that this jar is in any way connected 
>>>> to the previous job; they will be treated as separate things.
>>>>
>>>> However, you will likely end up with stale data in zookeeper (the 
>>>> jobgraph of the failed job).
>>>>
>>>> On 21/08/2020 17:51, Alexey Trenikhun wrote:
>>>>> Is it feasible to override ZooKeeperHaServices to recreate 
>>>>> JobGraph from jar instead of reading it from ZK state. Any hints? 
>>>>> I have feeling that reading JobGraph from jar is more resilient 
>>>>> approach, less chances of mistakes during upgrade
>>>>>
>>>>> Thanks,
>>>>> Alexey
>>>>>
>>>>> ------------------------------------------------------------------------
>>>>> *From:* Piotr Nowojski <pn...@apache.org> 
>>>>> <ma...@apache.org>
>>>>> *Sent:* Thursday, August 20, 2020 7:04 AM
>>>>> *To:* Chesnay Schepler <ch...@apache.org> 
>>>>> <ma...@apache.org>
>>>>> *Cc:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>>>> Flink User Mail List <us...@flink.apache.org> 
>>>>> <ma...@flink.apache.org>
>>>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>>>> Thank you for the clarification Chesney and sorry for the 
>>>>> incorrect previous answer.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> czw., 20 sie 2020 o 15:59 Chesnay Schepler <chesnay@apache.org 
>>>>> <ma...@apache.org>> napisał(a):
>>>>>
>>>>>     This is incorrect; we do store the JobGraph in ZooKeeper. If
>>>>>     you just delete the deployment the cluster will recover the
>>>>>     previous JobGraph (assuming you aren't changing the Zookeeper
>>>>>     configuration).
>>>>>
>>>>>     If you wish to update the job, then you should cancel it
>>>>>     (along with creating a savepoint), which will clear the
>>>>>     Zookeeper state, and then create a new deployment
>>>>>
>>>>>     On 20/08/2020 15:43, Piotr Nowojski wrote:
>>>>>>     Hi Alexey,
>>>>>>
>>>>>>     I might be wrong (I don't know this side of Flink very well),
>>>>>>     but as far as I know JobGraph is never stored in the ZK. It's
>>>>>>     always recreated from the job's JAR. So you should be able to
>>>>>>     upgrade the job by replacing the JAR with a newer version, as
>>>>>>     long as the operator UIDs are the same before and after the
>>>>>>     upgrade (for operator state to match before and after the
>>>>>>     upgrade).
>>>>>>
>>>>>>     Best, Piotrek
>>>>>>
>>>>>>     czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com
>>>>>>     <ma...@msn.com>> napisał(a):
>>>>>>
>>>>>>         Hello,
>>>>>>
>>>>>>         Let's say I run Flink Job cluster with persistent storage
>>>>>>         and Zookeeper HA on k8s with single  JobManager and use
>>>>>>         externalized checkpoints. When JM crashes, k8s will
>>>>>>         restart JM pod, and JM will read JobId and JobGraph from
>>>>>>         ZK and restore from latest checkpoint. Now let's say I
>>>>>>         want to upgrade job binary, I delete deployments, create
>>>>>>         new deployments referring to newer image, will JM still
>>>>>>         read JobGraph from ZK or will create new one from new job
>>>>>>         jar?
>>>>>>
>>>>>>         Thanks,
>>>>>>         Alexey
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Alexey Trenikhun <ye...@msn.com>.
Let’s say HA is enabled, so this part works. Now we want to upgrade job jar, we stop job with save point sp2, change manifest to specify “-s sp2” and newer image, and create K8s job again, on start will HAServices still read job graph from Zookeeper?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Sunday, August 23, 2020 7:25:11 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

If HA is enabled the the cluster will continue from the latest externalized checkpoint.
Without HA it still start from the savepoint.

On 23/08/2020 16:18, Alexey Trenikhun wrote:

Let’s say job cluster was submitted as job from save point sp1, so spec includes “-s sp1”, job run for days, takin externalized checkpoints every 5 minute, then suddenly pod failed, Kubernetes job controller restarts job pod using original job spec, which has “-s sp1”, so Flink job will start from sp1 rather than from latest externalized checkpoint. Is my understanding correct?


________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Sunday, August 23, 2020 1:46:45 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade


A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job finishes successfully, and if required starts new pods.


On 23/08/2020 06:43, Alexey Trenikhun wrote:
Since it is necessary to use cancel with save point/resume from save point, then it is not possible to use Deployment (otherwise JobManager pod will restart on crash from same save point), so we need to use Job, but in that case if Job pod is crashed who will start new instance of Job pod ? Sounds like currently HA with kubernetes is not achievable unless some controller is used to manage JobManager. Am I right?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Saturday, August 22, 2020 12:58 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

If, and only if, the cluster-id and JobId are identical then the JobGraph will be recovered from ZooKeeper.

On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are only being given the JobGraph", seems HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and potentially implementation of JobGraphStore#recoverJobGraph(JobID jobId) for this store could build new graph for jar rather than read stored graph from ZooKeeper?

Also, if there is single job with same job-id (job cluster), jobgraph of failed job will be over written by new one which will have same job-id?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, August 21, 2020 12:16 PM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the jobgraph of the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar instead of reading it from ZK state. Any hints? I have feeling that reading JobGraph from jar is more resilient approach, less chances of mistakes during upgrade

Thanks,
Alexey

________________________________
From: Piotr Nowojski <pn...@apache.org>
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a savepoint), which will clear the Zookeeper state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same before and after the upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey






Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
If HA is enabled the the cluster will continue from the latest 
externalized checkpoint.
Without HA it still start from the savepoint.

On 23/08/2020 16:18, Alexey Trenikhun wrote:
>
> Let’s say job cluster was submitted as job from save point sp1, so 
> spec includes “-s sp1”, job run for days, takin externalized 
> checkpoints every 5 minute, then suddenly pod failed, Kubernetes job 
> controller restarts job pod using original job spec, which has “-s 
> sp1”, so Flink job will start from sp1 rather than from latest 
> externalized checkpoint. Is my understanding correct?
>
>
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Sunday, August 23, 2020 1:46:45 AM
> *To:* Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski 
> <pn...@apache.org>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>
> A job cluster is submitted as a job, not a deployment.
>
> The built-in Job controller of Kubernetes ensures that this job 
> finishes successfully, and if required starts new pods.
>
>
>
> On 23/08/2020 06:43, Alexey Trenikhun wrote:
>> Since it is necessary to use cancel with save point/resume from save 
>> point, then it is not possible to use Deployment (otherwise 
>> JobManager pod will restart on crash from same save point), so we 
>> need to use Job, but in that case ifJob pod is crashed who will start 
>> new instance of Job pod ? Sounds like currently HA with kubernetes is 
>> not achievable unless some controller is used to manage JobManager. 
>> Am I right?
>>
>> ------------------------------------------------------------------------
>> *From:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>> *Sent:* Saturday, August 22, 2020 12:58 AM
>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>> <ma...@flink.apache.org>
>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>> If, and only if, the cluster-id and JobId are identical then the 
>> JobGraph will be recovered from ZooKeeper.
>>
>> On 22/08/2020 06:12, Alexey Trenikhun wrote:
>>> Not sure I that I understand your statement about "the HaServices 
>>> are only being given the JobGraph", seems 
>>> HighAvailabilityServices#getJobGraphStore provides JobGraphStore, 
>>> and potentially implementation of 
>>> JobGraphStore#recoverJobGraph(JobID jobId) for this store could 
>>> build new graph for jar rather than read stored graph from ZooKeeper?
>>>
>>> Also, if there is single job with same job-id (job cluster), 
>>> jobgraph of failed job will be over written by new one which will 
>>> have same job-id?
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Chesnay Schepler <ch...@apache.org> 
>>> <ma...@apache.org>
>>> *Sent:* Friday, August 21, 2020 12:16 PM
>>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>>> <ma...@flink.apache.org>
>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>> The HaServices are only being given the JobGraph, to this is not 
>>> possible.
>>>
>>> Actually I have to correct myself. For a job cluster the state in HA 
>>> should be irrelevant when you're submitting another jar.
>>> Flink has no way of knowing that this jar is in any way connected to 
>>> the previous job; they will be treated as separate things.
>>>
>>> However, you will likely end up with stale data in zookeeper (the 
>>> jobgraph of the failed job).
>>>
>>> On 21/08/2020 17:51, Alexey Trenikhun wrote:
>>>> Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
>>>> from jar instead of reading it from ZK state. Any hints? I have 
>>>> feeling that reading JobGraph from jar is more resilient approach, 
>>>> less chances of mistakes during upgrade
>>>>
>>>> Thanks,
>>>> Alexey
>>>>
>>>> ------------------------------------------------------------------------
>>>> *From:* Piotr Nowojski <pn...@apache.org> 
>>>> <ma...@apache.org>
>>>> *Sent:* Thursday, August 20, 2020 7:04 AM
>>>> *To:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>>>> *Cc:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>>> Flink User Mail List <us...@flink.apache.org> 
>>>> <ma...@flink.apache.org>
>>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>>> Thank you for the clarification Chesney and sorry for the incorrect 
>>>> previous answer.
>>>>
>>>> Piotrek
>>>>
>>>> czw., 20 sie 2020 o 15:59 Chesnay Schepler <chesnay@apache.org 
>>>> <ma...@apache.org>> napisał(a):
>>>>
>>>>     This is incorrect; we do store the JobGraph in ZooKeeper. If
>>>>     you just delete the deployment the cluster will recover the
>>>>     previous JobGraph (assuming you aren't changing the Zookeeper
>>>>     configuration).
>>>>
>>>>     If you wish to update the job, then you should cancel it (along
>>>>     with creating a savepoint), which will clear the Zookeeper
>>>>     state, and then create a new deployment
>>>>
>>>>     On 20/08/2020 15:43, Piotr Nowojski wrote:
>>>>>     Hi Alexey,
>>>>>
>>>>>     I might be wrong (I don't know this side of Flink very well),
>>>>>     but as far as I know JobGraph is never stored in the ZK. It's
>>>>>     always recreated from the job's JAR. So you should be able to
>>>>>     upgrade the job by replacing the JAR with a newer version, as
>>>>>     long as the operator UIDs are the same before and after the
>>>>>     upgrade (for operator state to match before and after the
>>>>>     upgrade).
>>>>>
>>>>>     Best, Piotrek
>>>>>
>>>>>     czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com
>>>>>     <ma...@msn.com>> napisał(a):
>>>>>
>>>>>         Hello,
>>>>>
>>>>>         Let's say I run Flink Job cluster with persistent storage
>>>>>         and Zookeeper HA on k8s with single  JobManager and use
>>>>>         externalized checkpoints. When JM crashes, k8s will
>>>>>         restart JM pod, and JM will read JobId and JobGraph from
>>>>>         ZK and restore from latest checkpoint. Now let's say I
>>>>>         want to upgrade job binary, I delete deployments, create
>>>>>         new deployments referring to newer image, will JM still
>>>>>         read JobGraph from ZK or will create new one from new job jar?
>>>>>
>>>>>         Thanks,
>>>>>         Alexey
>>>>>
>>>>
>>>
>>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Alexey Trenikhun <ye...@msn.com>.
Let’s say job cluster was submitted as job from save point sp1, so spec includes “-s sp1”, job run for days, takin externalized checkpoints every 5 minute, then suddenly pod failed, Kubernetes job controller restarts job pod using original job spec, which has “-s sp1”, so Flink job will start from sp1 rather than from latest externalized checkpoint. Is my understanding correct?


________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Sunday, August 23, 2020 1:46:45 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade


A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job finishes successfully, and if required starts new pods.


On 23/08/2020 06:43, Alexey Trenikhun wrote:
Since it is necessary to use cancel with save point/resume from save point, then it is not possible to use Deployment (otherwise JobManager pod will restart on crash from same save point), so we need to use Job, but in that case if Job pod is crashed who will start new instance of Job pod ? Sounds like currently HA with kubernetes is not achievable unless some controller is used to manage JobManager. Am I right?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Saturday, August 22, 2020 12:58 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

If, and only if, the cluster-id and JobId are identical then the JobGraph will be recovered from ZooKeeper.

On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are only being given the JobGraph", seems HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and potentially implementation of JobGraphStore#recoverJobGraph(JobID jobId) for this store could build new graph for jar rather than read stored graph from ZooKeeper?

Also, if there is single job with same job-id (job cluster), jobgraph of failed job will be over written by new one which will have same job-id?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, August 21, 2020 12:16 PM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the jobgraph of the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar instead of reading it from ZK state. Any hints? I have feeling that reading JobGraph from jar is more resilient approach, less chances of mistakes during upgrade

Thanks,
Alexey

________________________________
From: Piotr Nowojski <pn...@apache.org>
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a savepoint), which will clear the Zookeeper state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same before and after the upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey





Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
A job cluster is submitted as a job, not a deployment.

The built-in Job controller of Kubernetes ensures that this job finishes 
successfully, and if required starts new pods.



On 23/08/2020 06:43, Alexey Trenikhun wrote:
> Since it is necessary to use cancel with save point/resume from save 
> point, then it is not possible to use Deployment (otherwise JobManager 
> pod will restart on crash from same save point), so we need to use 
> Job, but in that case ifJob pod is crashed who will start new instance 
> of Job pod ? Sounds like currently HA with kubernetes is not 
> achievable unless some controller is used to manage JobManager. Am I 
> right?
>
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Saturday, August 22, 2020 12:58 AM
> *To:* Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski 
> <pn...@apache.org>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
> If, and only if, the cluster-id and JobId are identical then the 
> JobGraph will be recovered from ZooKeeper.
>
> On 22/08/2020 06:12, Alexey Trenikhun wrote:
>> Not sure I that I understand your statement about "the HaServices are 
>> only being given the JobGraph", seems 
>> HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and 
>> potentially implementation of JobGraphStore#recoverJobGraph(JobID 
>> jobId) for this store could build new graph for jar rather than read 
>> stored graph from ZooKeeper?
>>
>> Also, if there is single job with same job-id (job cluster), jobgraph 
>> of failed job will be over written by new one which will have same 
>> job-id?
>>
>> ------------------------------------------------------------------------
>> *From:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>> *Sent:* Friday, August 21, 2020 12:16 PM
>> *To:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>> Piotr Nowojski <pn...@apache.org> <ma...@apache.org>
>> *Cc:* Flink User Mail List <us...@flink.apache.org> 
>> <ma...@flink.apache.org>
>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>> The HaServices are only being given the JobGraph, to this is not 
>> possible.
>>
>> Actually I have to correct myself. For a job cluster the state in HA 
>> should be irrelevant when you're submitting another jar.
>> Flink has no way of knowing that this jar is in any way connected to 
>> the previous job; they will be treated as separate things.
>>
>> However, you will likely end up with stale data in zookeeper (the 
>> jobgraph of the failed job).
>>
>> On 21/08/2020 17:51, Alexey Trenikhun wrote:
>>> Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
>>> from jar instead of reading it from ZK state. Any hints? I have 
>>> feeling that reading JobGraph from jar is more resilient approach, 
>>> less chances of mistakes during upgrade
>>>
>>> Thanks,
>>> Alexey
>>>
>>> ------------------------------------------------------------------------
>>> *From:* Piotr Nowojski <pn...@apache.org> 
>>> <ma...@apache.org>
>>> *Sent:* Thursday, August 20, 2020 7:04 AM
>>> *To:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>>> *Cc:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>>> Flink User Mail List <us...@flink.apache.org> 
>>> <ma...@flink.apache.org>
>>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>>> Thank you for the clarification Chesney and sorry for the incorrect 
>>> previous answer.
>>>
>>> Piotrek
>>>
>>> czw., 20 sie 2020 o 15:59 Chesnay Schepler <chesnay@apache.org 
>>> <ma...@apache.org>> napisał(a):
>>>
>>>     This is incorrect; we do store the JobGraph in ZooKeeper. If you
>>>     just delete the deployment the cluster will recover the previous
>>>     JobGraph (assuming you aren't changing the Zookeeper configuration).
>>>
>>>     If you wish to update the job, then you should cancel it (along
>>>     with creating a savepoint), which will clear the Zookeeper
>>>     state, and then create a new deployment
>>>
>>>     On 20/08/2020 15:43, Piotr Nowojski wrote:
>>>>     Hi Alexey,
>>>>
>>>>     I might be wrong (I don't know this side of Flink very well),
>>>>     but as far as I know JobGraph is never stored in the ZK. It's
>>>>     always recreated from the job's JAR. So you should be able to
>>>>     upgrade the job by replacing the JAR with a newer version, as
>>>>     long as the operator UIDs are the same before and after the
>>>>     upgrade (for operator state to match before and after the upgrade).
>>>>
>>>>     Best, Piotrek
>>>>
>>>>     czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com
>>>>     <ma...@msn.com>> napisał(a):
>>>>
>>>>         Hello,
>>>>
>>>>         Let's say I run Flink Job cluster with persistent storage
>>>>         and Zookeeper HA on k8s with single  JobManager and use
>>>>         externalized checkpoints. When JM crashes, k8s will restart
>>>>         JM pod, and JM will read JobId and JobGraph from ZK and
>>>>         restore from latest checkpoint. Now let's say I want to
>>>>         upgrade job binary, I delete deployments, create new
>>>>         deployments referring to newer image, will JM still read
>>>>         JobGraph from ZK or will create new one from new job jar?
>>>>
>>>>         Thanks,
>>>>         Alexey
>>>>
>>>
>>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Alexey Trenikhun <ye...@msn.com>.
Since it is necessary to use cancel with save point/resume from save point, then it is not possible to use Deployment (otherwise JobManager pod will restart on crash from same save point), so we need to use Job, but in that case if Job pod is crashed who will start new instance of Job pod ? Sounds like currently HA with kubernetes is not achievable unless some controller is used to manage JobManager. Am I right?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Saturday, August 22, 2020 12:58 AM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

If, and only if, the cluster-id and JobId are identical then the JobGraph will be recovered from ZooKeeper.

On 22/08/2020 06:12, Alexey Trenikhun wrote:
Not sure I that I understand your statement about "the HaServices are only being given the JobGraph", seems HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and potentially implementation of JobGraphStore#recoverJobGraph(JobID jobId) for this store could build new graph for jar rather than read stored graph from ZooKeeper?

Also, if there is single job with same job-id (job cluster), jobgraph of failed job will be over written by new one which will have same job-id?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, August 21, 2020 12:16 PM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the jobgraph of the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar instead of reading it from ZK state. Any hints? I have feeling that reading JobGraph from jar is more resilient approach, less chances of mistakes during upgrade

Thanks,
Alexey

________________________________
From: Piotr Nowojski <pn...@apache.org>
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a savepoint), which will clear the Zookeeper state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same before and after the upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey




Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
If, and only if, the cluster-id and JobId are identical then the 
JobGraph will be recovered from ZooKeeper.

On 22/08/2020 06:12, Alexey Trenikhun wrote:
> Not sure I that I understand your statement about "the HaServices are 
> only being given the JobGraph", seems 
> HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and 
> potentially implementation of JobGraphStore#recoverJobGraph(JobID 
> jobId) for this store could build new graph for jar rather than read 
> stored graph from ZooKeeper?
>
> Also, if there is single job with same job-id (job cluster), jobgraph 
> of failed job will be over written by new one which will have same job-id?
>
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Friday, August 21, 2020 12:16 PM
> *To:* Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski 
> <pn...@apache.org>
> *Cc:* Flink User Mail List <us...@flink.apache.org>
> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
> The HaServices are only being given the JobGraph, to this is not possible.
>
> Actually I have to correct myself. For a job cluster the state in HA 
> should be irrelevant when you're submitting another jar.
> Flink has no way of knowing that this jar is in any way connected to 
> the previous job; they will be treated as separate things.
>
> However, you will likely end up with stale data in zookeeper (the 
> jobgraph of the failed job).
>
> On 21/08/2020 17:51, Alexey Trenikhun wrote:
>> Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
>> from jar instead of reading it from ZK state. Any hints? I have 
>> feeling that reading JobGraph from jar is more resilient approach, 
>> less chances of mistakes during upgrade
>>
>> Thanks,
>> Alexey
>>
>> ------------------------------------------------------------------------
>> *From:* Piotr Nowojski <pn...@apache.org> 
>> <ma...@apache.org>
>> *Sent:* Thursday, August 20, 2020 7:04 AM
>> *To:* Chesnay Schepler <ch...@apache.org> <ma...@apache.org>
>> *Cc:* Alexey Trenikhun <ye...@msn.com> <ma...@msn.com>; 
>> Flink User Mail List <us...@flink.apache.org> 
>> <ma...@flink.apache.org>
>> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
>> Thank you for the clarification Chesney and sorry for the incorrect 
>> previous answer.
>>
>> Piotrek
>>
>> czw., 20 sie 2020 o 15:59 Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> napisał(a):
>>
>>     This is incorrect; we do store the JobGraph in ZooKeeper. If you
>>     just delete the deployment the cluster will recover the previous
>>     JobGraph (assuming you aren't changing the Zookeeper configuration).
>>
>>     If you wish to update the job, then you should cancel it (along
>>     with creating a savepoint), which will clear the Zookeeper state,
>>     and then create a new deployment
>>
>>     On 20/08/2020 15:43, Piotr Nowojski wrote:
>>>     Hi Alexey,
>>>
>>>     I might be wrong (I don't know this side of Flink very well),
>>>     but as far as I know JobGraph is never stored in the ZK. It's
>>>     always recreated from the job's JAR. So you should be able to
>>>     upgrade the job by replacing the JAR with a newer version, as
>>>     long as the operator UIDs are the same before and after the
>>>     upgrade (for operator state to match before and after the upgrade).
>>>
>>>     Best, Piotrek
>>>
>>>     czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com
>>>     <ma...@msn.com>> napisał(a):
>>>
>>>         Hello,
>>>
>>>         Let's say I run Flink Job cluster with persistent storage
>>>         and Zookeeper HA on k8s with single  JobManager and use
>>>         externalized checkpoints. When JM crashes, k8s will restart
>>>         JM pod, and JM will read JobId and JobGraph from ZK and
>>>         restore from latest checkpoint. Now let's say I want to
>>>         upgrade job binary, I delete deployments, create new
>>>         deployments referring to newer image, will JM still read
>>>         JobGraph from ZK or will create new one from new job jar?
>>>
>>>         Thanks,
>>>         Alexey
>>>
>>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Alexey Trenikhun <ye...@msn.com>.
Not sure I that I understand your statement about "the HaServices are only being given the JobGraph", seems HighAvailabilityServices#getJobGraphStore provides JobGraphStore, and potentially implementation of JobGraphStore#recoverJobGraph(JobID jobId) for this store could build new graph for jar rather than read stored graph from ZooKeeper?

Also, if there is single job with same job-id (job cluster), jobgraph of failed job will be over written by new one which will have same job-id?

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, August 21, 2020 12:16 PM
To: Alexey Trenikhun <ye...@msn.com>; Piotr Nowojski <pn...@apache.org>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the jobgraph of the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar instead of reading it from ZK state. Any hints? I have feeling that reading JobGraph from jar is more resilient approach, less chances of mistakes during upgrade

Thanks,
Alexey

________________________________
From: Piotr Nowojski <pn...@apache.org>
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a savepoint), which will clear the Zookeeper state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same before and after the upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey



Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
The HaServices are only being given the JobGraph, to this is not possible.

Actually I have to correct myself. For a job cluster the state in HA 
should be irrelevant when you're submitting another jar.
Flink has no way of knowing that this jar is in any way connected to the 
previous job; they will be treated as separate things.

However, you will likely end up with stale data in zookeeper (the 
jobgraph of the failed job).

On 21/08/2020 17:51, Alexey Trenikhun wrote:
> Is it feasible to override ZooKeeperHaServices to recreate JobGraph 
> from jar instead of reading it from ZK state. Any hints? I have 
> feeling that reading JobGraph from jar is more resilient approach, 
> less chances of mistakes during upgrade
>
> Thanks,
> Alexey
>
> ------------------------------------------------------------------------
> *From:* Piotr Nowojski <pn...@apache.org>
> *Sent:* Thursday, August 20, 2020 7:04 AM
> *To:* Chesnay Schepler <ch...@apache.org>
> *Cc:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List 
> <us...@flink.apache.org>
> *Subject:* Re: Flink Job cluster in HA mode - recovery vs upgrade
> Thank you for the clarification Chesney and sorry for the incorrect 
> previous answer.
>
> Piotrek
>
> czw., 20 sie 2020 o 15:59 Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> napisał(a):
>
>     This is incorrect; we do store the JobGraph in ZooKeeper. If you
>     just delete the deployment the cluster will recover the previous
>     JobGraph (assuming you aren't changing the Zookeeper configuration).
>
>     If you wish to update the job, then you should cancel it (along
>     with creating a savepoint), which will clear the Zookeeper state,
>     and then create a new deployment
>
>     On 20/08/2020 15:43, Piotr Nowojski wrote:
>>     Hi Alexey,
>>
>>     I might be wrong (I don't know this side of Flink very well), but
>>     as far as I know JobGraph is never stored in the ZK. It's always
>>     recreated from the job's JAR. So you should be able to upgrade
>>     the job by replacing the JAR with a newer version, as long as the
>>     operator UIDs are the same before and after the upgrade (for
>>     operator state to match before and after the upgrade).
>>
>>     Best, Piotrek
>>
>>     czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com
>>     <ma...@msn.com>> napisał(a):
>>
>>         Hello,
>>
>>         Let's say I run Flink Job cluster with persistent storage and
>>         Zookeeper HA on k8s with single  JobManager and use
>>         externalized checkpoints. When JM crashes, k8s will restart
>>         JM pod, and JM will read JobId and JobGraph from ZK and
>>         restore from latest checkpoint. Now let's say I want to
>>         upgrade job binary, I delete deployments, create new
>>         deployments referring to newer image, will JM still read
>>         JobGraph from ZK or will create new one from new job jar?
>>
>>         Thanks,
>>         Alexey
>>
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Alexey Trenikhun <ye...@msn.com>.
Is it feasible to override ZooKeeperHaServices to recreate JobGraph from jar instead of reading it from ZK state. Any hints? I have feeling that reading JobGraph from jar is more resilient approach, less chances of mistakes during upgrade

Thanks,
Alexey

________________________________
From: Piotr Nowojski <pn...@apache.org>
Sent: Thursday, August 20, 2020 7:04 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Flink Job cluster in HA mode - recovery vs upgrade

Thank you for the clarification Chesney and sorry for the incorrect previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org>> napisał(a):
This is incorrect; we do store the JobGraph in ZooKeeper. If you just delete the deployment the cluster will recover the previous JobGraph (assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with creating a savepoint), which will clear the Zookeeper state, and then create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as I know JobGraph is never stored in the ZK. It's always recreated from the job's JAR. So you should be able to upgrade the job by replacing the JAR with a newer version, as long as the operator UIDs are the same before and after the upgrade (for operator state to match before and after the upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com>> napisał(a):
Hello,

Let's say I run Flink Job cluster with persistent storage and Zookeeper HA on k8s with single  JobManager and use externalized checkpoints. When JM crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from ZK and restore from latest checkpoint. Now let's say I want to upgrade job binary, I delete deployments, create new deployments referring to newer image, will JM still read JobGraph from ZK or will create new one from new job jar?

Thanks,
Alexey


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Piotr Nowojski <pn...@apache.org>.
Thank you for the clarification Chesney and sorry for the incorrect
previous answer.

Piotrek

czw., 20 sie 2020 o 15:59 Chesnay Schepler <ch...@apache.org> napisał(a):

> This is incorrect; we do store the JobGraph in ZooKeeper. If you just
> delete the deployment the cluster will recover the previous JobGraph
> (assuming you aren't changing the Zookeeper configuration).
>
> If you wish to update the job, then you should cancel it (along with
> creating a savepoint), which will clear the Zookeeper state, and then
> create a new deployment
>
> On 20/08/2020 15:43, Piotr Nowojski wrote:
>
> Hi Alexey,
>
> I might be wrong (I don't know this side of Flink very well), but as far
> as I know JobGraph is never stored in the ZK. It's always recreated from
> the job's JAR. So you should be able to upgrade the job by replacing the
> JAR with a newer version, as long as the operator UIDs are the same before
> and after the upgrade (for operator state to match before and after the
> upgrade).
>
> Best, Piotrek
>
> czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com> napisał(a):
>
>> Hello,
>>
>> Let's say I run Flink Job cluster with persistent storage and Zookeeper
>> HA on k8s with single  JobManager and use externalized checkpoints. When JM
>> crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from
>> ZK and restore from latest checkpoint. Now let's say I want to upgrade job
>> binary, I delete deployments, create new deployments referring to newer
>> image, will JM still read JobGraph from ZK or will create new one from new
>> job jar?
>>
>> Thanks,
>> Alexey
>>
>
>

Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Chesnay Schepler <ch...@apache.org>.
This is incorrect; we do store the JobGraph in ZooKeeper. If you just 
delete the deployment the cluster will recover the previous JobGraph 
(assuming you aren't changing the Zookeeper configuration).

If you wish to update the job, then you should cancel it (along with 
creating a savepoint), which will clear the Zookeeper state, and then 
create a new deployment

On 20/08/2020 15:43, Piotr Nowojski wrote:
> Hi Alexey,
>
> I might be wrong (I don't know this side of Flink very well), but as 
> far as I know JobGraph is never stored in the ZK. It's always 
> recreated from the job's JAR. So you should be able to upgrade the job 
> by replacing the JAR with a newer version, as long as the operator 
> UIDs are the same before and after the upgrade (for operator state to 
> match before and after the upgrade).
>
> Best, Piotrek
>
> czw., 20 sie 2020 o 06:34 Alexey Trenikhun <yender@msn.com 
> <ma...@msn.com>> napisał(a):
>
>     Hello,
>
>     Let's say I run Flink Job cluster with persistent storage and
>     Zookeeper HA on k8s with single  JobManager and use externalized
>     checkpoints. When JM crashes, k8s will restart JM pod, and JM will
>     read JobId and JobGraph from ZK and restore from latest
>     checkpoint. Now let's say I want to upgrade job binary, I delete
>     deployments, create new deployments referring to newer image, will
>     JM still read JobGraph from ZK or will create new one from new job
>     jar?
>
>     Thanks,
>     Alexey
>


Re: Flink Job cluster in HA mode - recovery vs upgrade

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Alexey,

I might be wrong (I don't know this side of Flink very well), but as far as
I know JobGraph is never stored in the ZK. It's always recreated from the
job's JAR. So you should be able to upgrade the job by replacing the JAR
with a newer version, as long as the operator UIDs are the same before and
after the upgrade (for operator state to match before and after the
upgrade).

Best, Piotrek

czw., 20 sie 2020 o 06:34 Alexey Trenikhun <ye...@msn.com> napisał(a):

> Hello,
>
> Let's say I run Flink Job cluster with persistent storage and Zookeeper HA
> on k8s with single  JobManager and use externalized checkpoints. When JM
> crashes, k8s will restart JM pod, and JM will read JobId and JobGraph from
> ZK and restore from latest checkpoint. Now let's say I want to upgrade job
> binary, I delete deployments, create new deployments referring to newer
> image, will JM still read JobGraph from ZK or will create new one from new
> job jar?
>
> Thanks,
> Alexey
>