You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mikhail Pryakhin <m....@gmail.com> on 2018/10/25 13:30:34 UTC

Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

Hi Flink experts!

When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? 
I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. 
But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it?

Thank you in advance!

Kind Regards,
Mike Pryakhin


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

Posted by Mikhail Pryakhin <m....@gmail.com>.
Hi Till, thanks for your reply! here is the issue ticket:

https://issues.apache.org/jira/browse/FLINK-10694 <https://issues.apache.org/jira/browse/FLINK-10694>

Kind Regards,
Mike Pryakhin

> On 26 Oct 2018, at 18:29, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Mike,
> 
> thanks for reporting this issue. I think you're right that Flink leaves some empty nodes in ZooKeeper. It seems that we don't delete the <flink-job-name> node with all its children in ZooKeeperHaServices#closeAndCleanupAllData.
> 
> Could you please open a JIRA issue to in order to fix it? Thanks a lot!
> 
> Cheers,
> Till
> 
> On Fri, Oct 26, 2018 at 4:31 PM Mikhail Pryakhin <m.pryahin@gmail.com <ma...@gmail.com>> wrote:
> Hi Andrey, Thanks a lot for your reply!
> 
>> What was the full job life cycle? 
> 
> 1. The job is deployed as a YARN cluster with the following properties set
> 
> 	high-availability: zookeeper
> 	high-availability.zookeeper.quorum: <a list of zookeeper hosts>
> 	high-availability.zookeeper.storageDir: hdfs:///<recovery-folder-path> <>
> 	high-availability.zookeeper.path.root: <flink-root-path>
> 	high-availability.zookeeper.path.namespace: <flink-job-name>
> 
> 2. The job is cancelled via flink cancel <job-id> command.
> 
>    What I've noticed:
> 	when the job is running the following directory structure is created in zookeeper
> 
> 	/<flink-root-path>/<flink-job-name>/leader/resource_manager_lock
> 	/<flink-root-path>/<flink-job-name>/leader/rest_server_lock
> 	/<flink-root-path>/<flink-job-name>/leader/dispatcher_lock
> 	/<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> 	/<flink-root-path>/<flink-job-name>/checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/0000000000000000041
> 	/<flink-root-path>/<flink-job-name>/checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
> 	/<flink-root-path>/<flink-job-name>/running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde
> 
> 
> 	when the job is cancelled the some ephemeral nodes disappear, but most of them are still there:
> 
> 	/<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
> 	/<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> 	/<flink-root-path>/<flink-job-name>/checkpoints/
> 	/<flink-root-path>/<flink-job-name>/checkpoint-counter/
> 	/<flink-root-path>/<flink-job-name>/running_job_registry/
> 
>> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
> 
> I start the job with Flink-1.6.1
> 
> 
>> Was there a failover of Job Master while running before the cancelation?
> 
> no there was no failover, as the job is deployed as a YARN cluster,  (YARN Cluster High Availability guide states that no failover is required)
> 
>> What version of Zookeeper do you use?
> 
> Zookeer-3.4.10
> 
>> In general, it should not be the case and all job related data should be cleaned from Zookeeper upon cancellation.
> 
> as far as I understood the issue concerns a JobManager failover process and my question is about a manual intended cancellation of a job.
> 
> Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. 
> And it seems it only cleans up the folder running_job_registry, other folders stay untouched. I supposed that everything under the /<flink-root-path>/<flink-job-name>/ folder is cleaned up when the job is cancelled.
> 
> 
> [1] https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107 <https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107>
> [2] https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332 <https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332> 
> 
> Kind Regards,
> Mike Pryakhin
> 
>> On 26 Oct 2018, at 12:39, Andrey Zagrebin <andrey@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi Mike,
>> 
>> What was the full job life cycle? 
>> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
>> Was there a failover of Job Master while running before the cancelation?
>> What version of Zookeeper do you use?
>> 
>> Flink creates child nodes to create a lock for the job in Zookeeper.
>> Lock is removed by removing child node (ephemeral).
>> Persistent node can be a problem because if job dies and does not remove it, 
>> persistent node will not timeout and disappear as ephemeral one 
>> and the next job instance will not delete it because it is supposed to be locked by the previous.
>> 
>> There was a recent fix in 1.6.1 where the job data was not properly deleted from Zookeeper [1].
>> In general, it should not be the case and all job related data should be cleaned from Zookeeper upon cancellation.
>> 
>> Best,
>> Andrey
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-10011 <https://issues.apache.org/jira/browse/FLINK-10011>
>> 
>>> On 25 Oct 2018, at 15:30, Mikhail Pryakhin <m.pryahin@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Flink experts!
>>> 
>>> When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? 
>>> I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. 
>>> But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it?
>>> 
>>> Thank you in advance!
>>> 
>>> Kind Regards,
>>> Mike Pryakhin
>>> 
>> 
> 


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

Posted by Till Rohrmann <tr...@apache.org>.
Hi Mike,

thanks for reporting this issue. I think you're right that Flink leaves
some empty nodes in ZooKeeper. It seems that we don't delete the
<flink-job-name> node with all its children in
ZooKeeperHaServices#closeAndCleanupAllData.

Could you please open a JIRA issue to in order to fix it? Thanks a lot!

Cheers,
Till

On Fri, Oct 26, 2018 at 4:31 PM Mikhail Pryakhin <m....@gmail.com>
wrote:

> Hi Andrey, Thanks a lot for your reply!
>
> What was the full job life cycle?
>
>
> 1. The job is deployed as a YARN cluster with the following properties set
>
> high-availability: zookeeper
> high-availability.zookeeper.quorum: <a list of zookeeper hosts>
> high-availability.zookeeper.storageDir: hdfs:///<recovery-folder-path>
> high-availability.zookeeper.path.root: <flink-root-path>
> high-availability.zookeeper.path.namespace: <flink-job-name>
>
> 2. The job is cancelled via flink cancel <job-id> command.
>
>    What I've noticed:
> when the job is running the following directory structure is created in
> zookeeper
>
> /<flink-root-path>/<flink-job-name>/leader/resource_manager_lock
> /<flink-root-path>/<flink-job-name>/leader/rest_server_lock
> /<flink-root-path>/<flink-job-name>/leader/dispatcher_lock
>
> /<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> /<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
> /<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
> /<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
>
> /<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
>
> /<flink-root-path>/<flink-job-name>/checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/0000000000000000041
>
> /<flink-root-path>/<flink-job-name>/checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
>
> /<flink-root-path>/<flink-job-name>/running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde
>
>
> when the job is cancelled the some ephemeral nodes disappear, but most of
> them are still there:
>
> /<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde
> /<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
> /<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
> /<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
>
> /<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
> /<flink-root-path>/<flink-job-name>/checkpoints/
> /<flink-root-path>/<flink-job-name>/checkpoint-counter/
> /<flink-root-path>/<flink-job-name>/running_job_registry/
>
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
>
>
> I start the job with Flink-1.6.1
>
>
> Was there a failover of Job Master while running before the cancelation?
>
> no there was no failover, as the job is deployed as a YARN cluster,  (YARN
> Cluster High Availability guide states that no failover is required)
>
> What version of Zookeeper do you use?
>
> Zookeer-3.4.10
>
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> as far as I understood the issue concerns a JobManager failover process
> and my question is about a manual intended cancellation of a job.
>
> Here is the method [1] responsible for cleaning zookeeper folders up [1]
> which is called when the job manager has stopped [2].
> And it seems it only cleans up the folder *running_job_registry,* other
> folders stay untouched. I supposed that everything under the */<flink-root-path>/<flink-job-name>/
> *folder is cleaned up when the job is cancelled.
>
>
> [1]
> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
> [2]
> https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332
>
>
> Kind Regards,
> Mike Pryakhin
>
> On 26 Oct 2018, at 12:39, Andrey Zagrebin <an...@data-artisans.com>
> wrote:
>
> Hi Mike,
>
> What was the full job life cycle?
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0?
> Was there a failover of Job Master while running before the cancelation?
> What version of Zookeeper do you use?
>
> Flink creates child nodes to create a lock for the job in Zookeeper.
> Lock is removed by removing child node (ephemeral).
> Persistent node can be a problem because if job dies and does not remove
> it,
> persistent node will not timeout and disappear as ephemeral one
> and the next job instance will not delete it because it is supposed to be
> locked by the previous.
>
> There was a recent fix in 1.6.1 where the job data was not properly
> deleted from Zookeeper [1].
> In general, it should not be the case and all job related data should be
> cleaned from Zookeeper upon cancellation.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-10011
>
> On 25 Oct 2018, at 15:30, Mikhail Pryakhin <m....@gmail.com> wrote:
>
> Hi Flink experts!
>
> When a streaming job with Zookeeper-HA enabled gets cancelled all the
> job-related Zookeeper nodes are not removed. Is there a reason behind that?
> I noticed that Zookeeper paths are created of type "Container Node" (an
> Ephemeral node that can have nested nodes) and fall back to Persistent node
> type in case Zookeeper doesn't support this sort of nodes.
> But anyway, it is worth removing the job Zookeeper node when a job is
> cancelled, isn't it?
>
> Thank you in advance!
>
> Kind Regards,
> Mike Pryakhin
>
>
>
>

Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

Posted by Mikhail Pryakhin <m....@gmail.com>.
Hi Andrey, Thanks a lot for your reply!

> What was the full job life cycle? 

1. The job is deployed as a YARN cluster with the following properties set

	high-availability: zookeeper
	high-availability.zookeeper.quorum: <a list of zookeeper hosts>
	high-availability.zookeeper.storageDir: hdfs:///<recovery-folder-path>
	high-availability.zookeeper.path.root: <flink-root-path>
	high-availability.zookeeper.path.namespace: <flink-job-name>

2. The job is cancelled via flink cancel <job-id> command.

   What I've noticed:
	when the job is running the following directory structure is created in zookeeper

	/<flink-root-path>/<flink-job-name>/leader/resource_manager_lock
	/<flink-root-path>/<flink-job-name>/leader/rest_server_lock
	/<flink-root-path>/<flink-job-name>/leader/dispatcher_lock
	/<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
	/<flink-root-path>/<flink-job-name>/checkpoints/5c21f00b9162becf5ce25a1cf0e67cde/0000000000000000041
	/<flink-root-path>/<flink-job-name>/checkpoint-counter/5c21f00b9162becf5ce25a1cf0e67cde
	/<flink-root-path>/<flink-job-name>/running_job_registry/5c21f00b9162becf5ce25a1cf0e67cde


	when the job is cancelled the some ephemeral nodes disappear, but most of them are still there:

	/<flink-root-path>/<flink-job-name>/leader/5c21f00b9162becf5ce25a1cf0e67cde
	/<flink-root-path>/<flink-job-name>/leaderlatch/resource_manager_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/rest_server_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/dispatcher_lock
	/<flink-root-path>/<flink-job-name>/leaderlatch/5c21f00b9162becf5ce25a1cf0e67cde/job_manager_lock
	/<flink-root-path>/<flink-job-name>/checkpoints/
	/<flink-root-path>/<flink-job-name>/checkpoint-counter/
	/<flink-root-path>/<flink-job-name>/running_job_registry/

> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 

I start the job with Flink-1.6.1


> Was there a failover of Job Master while running before the cancelation?

no there was no failover, as the job is deployed as a YARN cluster,  (YARN Cluster High Availability guide states that no failover is required)

> What version of Zookeeper do you use?

Zookeer-3.4.10

> In general, it should not be the case and all job related data should be cleaned from Zookeeper upon cancellation.

as far as I understood the issue concerns a JobManager failover process and my question is about a manual intended cancellation of a job.

Here is the method [1] responsible for cleaning zookeeper folders up [1] which is called when the job manager has stopped [2]. 
And it seems it only cleans up the folder running_job_registry, other folders stay untouched. I supposed that everything under the /<flink-root-path>/<flink-job-name>/ folder is cleaned up when the job is cancelled.


[1] https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java#L107
[2] https://github.com/apache/flink/blob/f087f57749004790b6f5b823d66822c36ae09927/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L332 

Kind Regards,
Mike Pryakhin

> On 26 Oct 2018, at 12:39, Andrey Zagrebin <an...@data-artisans.com> wrote:
> 
> Hi Mike,
> 
> What was the full job life cycle? 
> Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
> Was there a failover of Job Master while running before the cancelation?
> What version of Zookeeper do you use?
> 
> Flink creates child nodes to create a lock for the job in Zookeeper.
> Lock is removed by removing child node (ephemeral).
> Persistent node can be a problem because if job dies and does not remove it, 
> persistent node will not timeout and disappear as ephemeral one 
> and the next job instance will not delete it because it is supposed to be locked by the previous.
> 
> There was a recent fix in 1.6.1 where the job data was not properly deleted from Zookeeper [1].
> In general, it should not be the case and all job related data should be cleaned from Zookeeper upon cancellation.
> 
> Best,
> Andrey
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10011 <https://issues.apache.org/jira/browse/FLINK-10011>
> 
>> On 25 Oct 2018, at 15:30, Mikhail Pryakhin <m.pryahin@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Flink experts!
>> 
>> When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? 
>> I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. 
>> But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it?
>> 
>> Thank you in advance!
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
> 


Re: Flink-1.6.1 :: HighAvailability :: ZooKeeperRunningJobsRegistry

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi Mike,

What was the full job life cycle? 
Did you start it with Flink 1.6.1 or canceled job running with 1.6.0? 
Was there a failover of Job Master while running before the cancelation?
What version of Zookeeper do you use?

Flink creates child nodes to create a lock for the job in Zookeeper.
Lock is removed by removing child node (ephemeral).
Persistent node can be a problem because if job dies and does not remove it, 
persistent node will not timeout and disappear as ephemeral one 
and the next job instance will not delete it because it is supposed to be locked by the previous.

There was a recent fix in 1.6.1 where the job data was not properly deleted from Zookeeper [1].
In general, it should not be the case and all job related data should be cleaned from Zookeeper upon cancellation.

Best,
Andrey

[1] https://issues.apache.org/jira/browse/FLINK-10011

> On 25 Oct 2018, at 15:30, Mikhail Pryakhin <m....@gmail.com> wrote:
> 
> Hi Flink experts!
> 
> When a streaming job with Zookeeper-HA enabled gets cancelled all the job-related Zookeeper nodes are not removed. Is there a reason behind that? 
> I noticed that Zookeeper paths are created of type "Container Node" (an Ephemeral node that can have nested nodes) and fall back to Persistent node type in case Zookeeper doesn't support this sort of nodes. 
> But anyway, it is worth removing the job Zookeeper node when a job is cancelled, isn't it?
> 
> Thank you in advance!
> 
> Kind Regards,
> Mike Pryakhin
>