You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhu Zhu (JIRA)" <ji...@apache.org> on 2019/04/11 02:48:00 UTC

[jira] [Comment Edited] (FLINK-11813) Standby per job mode Dispatchers don't know job's JobSchedulingStatus

    [ https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815027#comment-16815027 ] 

Zhu Zhu edited comment on FLINK-11813 at 4/11/19 2:47 AM:
----------------------------------------------------------

Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts.

   We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA leadership
 # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job.

 

I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle.

 

I agree with your proposal too, which can well handle the unexpected submission duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING?

 

 


was (Author: zhuzh):
Hi Till, for your 2 questions above:

1. If one job with jobID xxx terminates, and later another job with the same jobID is submitted, I think Flink can regard it as a valid submission. Currently in our production use, there is a way that the client re-submit previously generated JobGraph to speed up the job launching, when the previous job is FAILED. In this case, job with the same ID are seen as different attempts.

   We did not handle the unexpected duplicated submission if the second submission comes after the first one is completed. Not sure in what case this may happen?

2. The process would be like this
 # submitting job -> setting status in RunningJobsRegistry to be pending in *Dispatcher* (null/NONE -> PENDING)
 # creating and launching JobManagerRunner which will try to acquire the HA leadership
 # once a JobManager is granted leadership, it changes the job status in RunningJobsRegistry to RUNNING  and starts the JobMaster(or creating a new JobMaster as proposed in FLINK-11719) (PENDING -> RUNNING)
 # when this job terminates, the JobManager removes the job from the RunningJobsRegistry (RUNNING -> NONE)

           So if it is the first time to launch the JM, the job status is PENDING so the job will be started. If it is a second time leadership gaining, and the first is completed, the job status would be NONE. Besides, if JM failover happens during the PENDING/RUNNING status, the new leader will also restart the job.

 

I totally agree that "the main problem is that the entries of the {{RunningJobsRegistry}} are bound to the lifecycle of the {{JobManagerRunner}}/{{Job}} instead of the {{Dispatcher"}}. I think the job submission in the Dispatcher is the beginning of lifecycle.

 

I think with your proposal if fine, which can handle the unexpected submission duplications.

One thing to confirm is that, as in stage 5 the job status is changed to be RUNNING already in job submission, in stage 3 should we restart the job only if it is RUNNING?

 

 

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> ---------------------------------------------------------------------
>
>                 Key: FLINK-11813
>                 URL: https://issues.apache.org/jira/browse/FLINK-11813
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Till Rohrmann
>            Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode will restart a terminated job after they gained leadership. The problem is that we currently clear the {{RunningJobsRegistry}} once a job has reached a globally terminal state. After the leading {{Dispatcher}} terminates, a standby {{Dispatcher}} will gain leadership. Without having the information from the {{RunningJobsRegistry}} it cannot tell whether the job has been executed or whether the {{Dispatcher}} needs to re-execute the job. At the moment, the {{Dispatcher}} will assume that there was a fault and hence re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job has been successfully executed. One trivial solution could be to not clean up the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)