You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2018/08/01 15:13:45 UTC

Old job resurrected during HA failover

For the second time in as many months we've had an old job resurrected
during HA failover in a 1.4.2 standalone cluster.  Failover was initiated
when the leading JM lost its connection to ZK.  I opened FLINK-10011
<https://issues.apache.org/jira/browse/FLINK-10011> with the details.

We are using S3 with the Presto adapter as our distributed store.  After we
cleaned up the cluster by shutting down the two jobs started after failover
and starting a new job from the last known good checkpoint from the single
job running in the cluster before failover, the HA recovery directory looks
as follows:

3cmd ls s3://bucket/flink/cluster_1/recovery/
 DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
2018-07-31 17:33 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
2018-07-31 17:34 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
2018-07-31 17:32 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
2018-06-12 20:01 284626
s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
2018-07-30 23:01 285257
s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

submittedJobGraph7f627a661cec appears to be job
2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during
the ZK failover

submittedJobGraphf3767780c00c appears to be job
d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a
checkpoint after shutting down the duplicate jobs

Should submittedJobGraph7f627a661cec exist in the recovery directory if
2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?

Re: Old job resurrected during HA failover

Posted by Till Rohrmann <tr...@apache.org>.
Hi Elias and Vino,

sorry for the late reply.

I think your analysis is pretty much to the point. The current
implementation does not properly respect the situation with multiple
standby JobManagers. In the single JobManager case, a loss of leadership
either means that the JobManager has died and, thus, also its ZooKeeper
connection which causes the ephemeral nodes to disappear or the same
JobManager will be reelected. In the case of multiple standby JobManagers a
lost leadership caused by a ZooKeeper hickup could cause that a different
JM will become the leader while the old leader still keeps its connection
to ZooKeeper (e.g. after reestablishing it). In this case, the ephemeral
nodes won't be deleted automatically. Consequently, it is necessary to
explicitly free all locked resources as Elias has proposed. This problem
affects the legacy as well as the new mode. This is a critical issue which
we should fix asap.

Thanks for reporting this issue and the in-depth analysis of the cause
Elias!

A somewhat related problem is that the actual ZooKeeper delete operation is
executed in a background thread without proper failure handling. As far as
I can tell, we only log on DEBUG that the node could not be deleted. I
think this should be fixed as well because then the problem would be easier
to identify.

Cheers,
Till

On Fri, Aug 3, 2018 at 5:42 PM Elias Levy <fe...@gmail.com>
wrote:

> Till,
>
> Thoughts?
>
> On Wed, Aug 1, 2018 at 7:34 PM vino yang <ya...@gmail.com> wrote:
>
>> Your analysis is correct, yes, in theory the old jobgraph should be
>> deleted, but Flink currently uses the method of locking and asynchronously
>> deleting Path, so that it can not give you the acknowledgment of deleting,
>> so this is a risk point.
>>
>> cc Till, there have been users who have encountered this problem before.
>> I personally think that asynchronous deletion may be risky, which may cause
>> JM to be revived by the cancel job after the failover.
>>
>

Re: Old job resurrected during HA failover

Posted by Elias Levy <fe...@gmail.com>.
Till,

Thoughts?

On Wed, Aug 1, 2018 at 7:34 PM vino yang <ya...@gmail.com> wrote:

> Your analysis is correct, yes, in theory the old jobgraph should be
> deleted, but Flink currently uses the method of locking and asynchronously
> deleting Path, so that it can not give you the acknowledgment of deleting,
> so this is a risk point.
>
> cc Till, there have been users who have encountered this problem before. I
> personally think that asynchronous deletion may be risky, which may cause
> JM to be revived by the cancel job after the failover.
>

Re: Old job resurrected during HA failover

Posted by vino yang <ya...@gmail.com>.
Hi Elias,

Your analysis is correct, yes, in theory the old jobgraph should be
deleted, but Flink currently uses the method of locking and asynchronously
deleting Path, so that it can not give you the acknowledgment of deleting,
so this is a risk point.

cc Till, there have been users who have encountered this problem before. I
personally think that asynchronous deletion may be risky, which may cause
JM to be revived by the cancel job after the failover.

Thanks, vino.

2018-08-02 5:25 GMT+08:00 Elias Levy <fe...@gmail.com>:

> I can see in the logs that the JM 1 (10.210.22.167), that one that became
> leader after failover, thinks it deleted the
> 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:
>
> July 30th 2018, 15:32:27.231 Trying to cancel job with ID
> 2a4eff355aef849c5ca37dbac04f2ff1.
> July 30th 2018, 15:32:27.232 Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1)
> switched from state RESTARTING to CANCELED.
> July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job
> 2a4eff355aef849c5ca37dbac04f2ff1
> July 30th 2018, 15:32:27.239 Removed job graph
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
> July 30th 2018, 15:32:27.245 Removing /flink/cluster_1/checkpoints/
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
> July 30th 2018, 15:32:27.251 Removing /checkpoint-counter/
> 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
>
> Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
> and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
> no longer exist, but for some reason the job graph as is still there.
>
> Looking at the ZK logs I find the problem:
>
> July 30th 2018, 15:32:27.241 Got user-level KeeperException when
> processing sessionid:0x2000001d2330001 type:delete cxid:0x434c
> zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/
> jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode =
> Directory not empty for /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1
>
> Looking in ZK, we see:
>
> [zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1
> [d833418c-891a-4b5e-b983-080be803275c]
>
> From the comments in ZooKeeperStateHandleStore.java I gather that this
> child node is used as a deletion lock.  Looking at the contents of this
> ephemeral lock node:
>
> [zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/
> 2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
> *10.210.42.62*
> cZxid = 0x60002ffa7
> ctime = Tue Jun 12 20:01:26 UTC 2018
> mZxid = 0x60002ffa7
> mtime = Tue Jun 12 20:01:26 UTC 2018
> pZxid = 0x60002ffa7
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x30000003f4a0003
> dataLength = 12
> numChildren = 0
>
> and compared to the ephemeral node lock of the currently running job:
>
> [zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/
> d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
> *10.210.22.167*
> cZxid = 0x60009df4b
> ctime = Mon Jul 30 23:01:04 UTC 2018
> mZxid = 0x60009df4b
> mtime = Mon Jul 30 23:01:04 UTC 2018
> pZxid = 0x60009df4b
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x2000001d2330001
> dataLength = 13
> numChildren = 0
>
> Assuming the content of the nodes represent the owner, it seems the job
> graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is
> locked by the previous JM leader, JM 2(10.210.42.62), while the running job
> locked by the current JM leader, JM 1 (10.210.22.167).
>
> Somehow the previous leader, JM 2, did not give up the lock when
> leadership failed over to JM 2.
>
> Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA
> failover to release the locks on the graphs?
>
>
> On Wed, Aug 1, 2018 at 9:49 AM Elias Levy <fe...@gmail.com>
> wrote:
>
>> Thanks for the reply.  Looking in ZK I see:
>>
>> [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
>> [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]
>>
>> Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even
>> though that job is no longer running (it was canceled while it was in a
>> loop attempting to restart, but failing because of a lack of cluster slots).
>>
>> Any idea why that may be the case?
>>
>>>

Re: Old job resurrected during HA failover

Posted by Elias Levy <fe...@gmail.com>.
I can see in the logs that the JM 1 (10.210.22.167), that one that became
leader after failover, thinks it deleted
the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231 Trying to cancel job with ID
2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232 Job Some Job
(2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to
CANCELED.
July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job
2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239 Removed job graph
2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245 Removing
/flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251 Removing
/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 no
longer exist, but for some reason the job graph as is still there.

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241 Got user-level KeeperException when processing
sessionid:0x2000001d2330001 type:delete cxid:0x434c zxid:0x60009dd94
txntype:-1 reqpath:n/a Error
Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = Directory not empty for
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

From the comments in ZooKeeperStateHandleStore.java I gather that this
child node is used as a deletion lock.  Looking at the contents of this
ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
*10.210.42.62*
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x30000003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get
/flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
*10.210.22.167*
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000001d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job
graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked
by the previous JM leader, JM 2(10.210.42.62), while the running job locked
by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership
failed over to JM 2.

Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA
failover to release the locks on the graphs?


On Wed, Aug 1, 2018 at 9:49 AM Elias Levy <fe...@gmail.com>
wrote:

> Thanks for the reply.  Looking in ZK I see:
>
> [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
> [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]
>
> Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even
> though that job is no longer running (it was canceled while it was in a
> loop attempting to restart, but failing because of a lack of cluster slots).
>
> Any idea why that may be the case?
>
>>

Re: Old job resurrected during HA failover

Posted by Elias Levy <fe...@gmail.com>.
Vino,

Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though
that job is no longer running (it was canceled while it was in a loop
attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?


On Wed, Aug 1, 2018 at 8:38 AM vino yang <ya...@gmail.com> wrote:

> If a job is explicitly canceled, its jobgraph node on ZK will be deleted.
> However, it is worth noting here that Flink enables a background thread to
> asynchronously delete the jobGraph node,
> so there may be cases where it cannot be deleted.
> On the other hand, the jobgraph node on ZK is the only basis for the JM
> leader to restore the job.
> There may be an unexpected recovery or an old job resurrection.
>

Re: Old job resurrected during HA failover

Posted by vino yang <ya...@gmail.com>.
Hi Elias,

If a job is explicitly canceled, its jobgraph node on ZK will be deleted.
However, it is worth noting here that Flink enables a background thread to
asynchronously delete the jobGraph node,
so there may be cases where it cannot be deleted.
On the other hand, the jobgraph node on ZK is the only basis for the JM
leader to restore the job.
There may be an unexpected recovery or an old job resurrection.

Thanks, vino.

2018-08-01 23:13 GMT+08:00 Elias Levy <fe...@gmail.com>:

> For the second time in as many months we've had an old job resurrected
> during HA failover in a 1.4.2 standalone cluster.  Failover was initiated
> when the leading JM lost its connection to ZK.  I opened FLINK-10011
> <https://issues.apache.org/jira/browse/FLINK-10011> with the details.
>
> We are using S3 with the Presto adapter as our distributed store.  After
> we cleaned up the cluster by shutting down the two jobs started after
> failover and starting a new job from the last known good checkpoint from
> the single job running in the cluster before failover, the HA recovery
> directory looks as follows:
>
> 3cmd ls s3://bucket/flink/cluster_1/recovery/
>  DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> 2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint12e06bef01c5
> 2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint187e0d2ae7cb
> 2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/
> completedCheckpoint22fc8ca46f02
> 2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/
> submittedJobGraph7f627a661cec
> 2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/
> submittedJobGraphf3767780c00c
>
> submittedJobGraph7f627a661cec appears to be job
> 2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during
> the ZK failover
>
> submittedJobGraphf3767780c00c appears to be job
> d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a
> checkpoint after shutting down the duplicate jobs
>
> Should submittedJobGraph7f627a661cec exist in the recovery directory if
> 2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?
>
>
>