You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Xiaogang Shi (JIRA)" <ji...@apache.org> on 2018/11/28 07:53:00 UTC

[jira] [Commented] (FLINK-10333) Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)

    [ https://issues.apache.org/jira/browse/FLINK-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701495#comment-16701495 ] 

Xiaogang Shi commented on FLINK-10333:
--------------------------------------

Hi [~till.rohrmann] Our cluster is suffering from unstable Zookeeper connections and I think this effort will help deal with some problems.

But we are still suffering from some problems in leader elections. The main cause is due the lack of atomicity. For example, JobMaster will write its address on another znode when it becomes the leader. But its leadership may already be lost when it is going to write its address (e.g., due to a long-time full GC). To alleviate the problem, many double checks are used in the code. Similar problems are also observed in the access to checkpoints. When an old job master loses its leadership, it may still have access to the checkpoints in Zookeeper and may modify them. Various methods (including locks to disallow deletion and rescanning zookeeper on restoring) are deployed to deal with these exceptions, but it does not seem to be a perfect solution.

After diving deep into the implementation of leader election in Zookeeper Recipes, i have some ideas to improve our implementation. The basic idea is that we should guarantee that only the elected leader has the access to Zookeeper. In Zookeeper Recipes, each leader contender will create an election znode which is SEQUENTIAL and EPHERMAL under a certain path. The contender with the smallest sequence number will be elected as the leader. When the elected leader fails, its election znode will disappear and the contender whose session number is smallest among the remaining contenders will be elected as the new leader. So once a contender grants the leadership,  its election znode must exist in Zookeeper. Hence, we can record the election node of each contender. Once each contender wants to modify something in Zookeeper, it must put these modification, together with the check on the existence of its election node, in a Zookeeper transaction. If the contender has already lost its leadership, the transaction will fail due to the unsatisfied check. That way, we can ensure only the elected leader has access to the states in Zookeeper.

Currently, Zookeeper Recipes does not expose any interface to access the path of election nodes. Maybe we need to reimplement the leader election with native Zookeeper interfaces in Flink without the usage of Zookeeper Recipes.

What do you think of the idea?

> Rethink ZooKeeper based stores (SubmittedJobGraph, MesosWorker, CompletedCheckpoints)
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-10333
>                 URL: https://issues.apache.org/jira/browse/FLINK-10333
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Priority: Major
>             Fix For: 1.8.0
>
>
> While going over the ZooKeeper based stores ({{ZooKeeperSubmittedJobGraphStore}}, {{ZooKeeperMesosWorkerStore}}, {{ZooKeeperCompletedCheckpointStore}}) and the underlying {{ZooKeeperStateHandleStore}} I noticed several inconsistencies which were introduced with past incremental changes.
> * Depending whether {{ZooKeeperStateHandleStore#getAllSortedByNameAndLock}} or {{ZooKeeperStateHandleStore#getAllAndLock}} is called, deserialization problems will either lead to removing the Znode or not
> * {{ZooKeeperStateHandleStore}} leaves inconsistent state in case of exceptions (e.g. {{#getAllAndLock}} won't release the acquired locks in case of a failure)
> * {{ZooKeeperStateHandleStore}} has too many responsibilities. It would be better to move {{RetrievableStateStorageHelper}} out of it for a better separation of concerns
> * {{ZooKeeperSubmittedJobGraphStore}} overwrites a stored {{JobGraph}} even if it is locked. This should not happen since it could leave another system in an inconsistent state (imagine a changed {{JobGraph}} which restores from an old checkpoint)
> * Redundant but also somewhat inconsistent put logic in the different stores
> * Shadowing of ZooKeeper specific exceptions in {{ZooKeeperStateHandleStore}} which were expected to be caught in {{ZooKeeperSubmittedJobGraphStore}}
> * Getting rid of the {{SubmittedJobGraphListener}} would be helpful
> These problems made me think how reliable these components actually work. Since these components are very important, I propose to refactor them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)