You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2020/10/23 12:53:00 UTC

[jira] [Closed] (FLINK-19557) Issue retrieving leader after zookeeper session reconnect

     [ https://issues.apache.org/jira/browse/FLINK-19557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Till Rohrmann closed FLINK-19557.
---------------------------------
    Resolution: Fixed

Fixed via

1.12.0: 594e6c03aa2942dcfe7d9ab37edacaaebf3af556
1.11.3: 751d42ca4a9642f53cb68f4b315b1a2cd000a42f
1.10.3: 933041b3d8d192391437e343857f1dec14c544d1

> Issue retrieving leader after zookeeper session reconnect
> ---------------------------------------------------------
>
>                 Key: FLINK-19557
>                 URL: https://issues.apache.org/jira/browse/FLINK-19557
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.2, 1.12.0, 1.11.2
>            Reporter: Max Mizikar
>            Assignee: Till Rohrmann
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.10.3, 1.11.3
>
>
> We have noticed an issue with leaders being retrieved after reconnecting to zookeeper. The steps to reproduce this issue are to break the connection between a job manager that is not the leader and zookeeper. Wait for the session to be lost between the two. At this point, flink notifies for a loss of leader. After the loss of leader has occured, reconnect the job manager to zookeeper. At this point, the leader will still be the same as it was before, but when trying to access the rest API, you will see this
> {code}
> $ curl -s localhost:8999/jobs
> {"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]}
> {code}
> I have been using `stress -t 60 -m 2048` (which spins up 2048 threads continuously alloc and freeing 256MB, to swap out the job manager and cause the connection loss.
> I have done some amount of digging on this. The ZooKeeperLeaderRetrievalService has this code block for handling state changes
> {code}
> 	protected void handleStateChange(ConnectionState newState) {
> 		switch (newState) {
> 			case CONNECTED:
> 				LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
> 				break;
> 			case SUSPENDED:
> 				LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " +
> 						"ZooKeeper.");
> 				synchronized (lock) {
> 					notifyLeaderLoss();
> 				}
> 				break;
> 			case RECONNECTED:
> 				LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
> 				break;
> 			case LOST:
> 				LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
> 						"ZooKeeper.");
> 				synchronized (lock) {
> 					notifyLeaderLoss();
> 				}
> 				break;
> 		}
> 	}
> {code}
> It calls notifyLeaderLoss() when the connection is lost, but it doesn't do anything when the connection is reconnected. It appears that curator's NodeCache will retrieve the value of the leader znode after reconnect, but it won't notify the listeners if the value is the same as before the connection loss. So, unless a leader election happens after a zookeeper connection loss, the job managers that are not the leader will never know that there is a leader.
> The method that is called for NodeCache when a new value is retrieved
> {code}
>     private void setNewData(ChildData newData) throws InterruptedException
>     {
>         ChildData   previousData = data.getAndSet(newData);
>         if ( !Objects.equal(previousData, newData) )
>         {
>             listeners.forEach(listener -> {
>                 try
>                 {
>                     listener.nodeChanged();
>                 }
>                 catch ( Exception e )
>                 {
>                     ThreadUtils.checkInterrupted(e);
>                     log.error("Calling listener", e);
>                 }
>             });
>             if ( rebuildTestExchanger != null )
>             {
>                 try
>                 {
>                     rebuildTestExchanger.exchange(new Object());
>                 }
>                 catch ( InterruptedException e )
>                 {
>                     Thread.currentThread().interrupt();
>                 }
>             }
>         }
>     }
> {code}
> note the
> {code}
>         if ( !Objects.equal(previousData, newData) )
> {code}
> seems to be preventing the job managers from getting the leader after a zookeeper connection loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)