You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2022/09/27 03:01:35 UTC

[curator] branch master updated: CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b96f18b CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)
4b96f18b is described below

commit 4b96f18b311485d006f65c1462d2cdd0b5838c84
Author: tison <wa...@gmail.com>
AuthorDate: Tue Sep 27 11:01:28 2022 +0800

    CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)
    
    Signed-off-by: tison <wa...@gmail.com>
    Co-authored-by: Matthias Pohl <ma...@aiven.io>
---
 curator-recipes/pom.xml                            |  6 ++
 .../framework/recipes/leader/LeaderLatch.java      | 38 +++++++++++--
 .../framework/recipes/leader/TestLeaderLatch.java  | 64 +++++++++++++++++++++-
 curator-test-zk35/pom.xml                          |  6 ++
 4 files changed, 107 insertions(+), 7 deletions(-)

diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index d27d7fa3..b84a9489 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -86,6 +86,12 @@
             <artifactId>commons-math</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 5d1c249b..e8187cec 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -190,6 +190,12 @@ public class LeaderLatch implements Closeable
         close(closeMode);
     }
 
+    @VisibleForTesting
+    void closeOnDemand() throws IOException
+    {
+        internalClose(closeMode, false);
+    }
+
     /**
      * Remove this instance from the leadership election. If this instance is the leader, leadership
      * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
@@ -198,9 +204,25 @@ public class LeaderLatch implements Closeable
      * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
      * @throws IOException errors
      */
-    public synchronized void close(CloseMode closeMode) throws IOException
+    public void close(CloseMode closeMode) throws IOException
     {
-        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+        internalClose(closeMode, true);
+    }
+
+    private synchronized void internalClose(CloseMode closeMode, boolean failOnClosed) throws IOException
+    {
+        if (!state.compareAndSet(State.STARTED, State.CLOSED))
+        {
+            if (failOnClosed)
+            {
+                throw new IllegalStateException("Already closed or has not been started");
+            }
+            else
+            {
+                return;
+            }
+        }
+
         Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
 
         cancelStartTask();
@@ -586,6 +608,9 @@ public class LeaderLatch implements Closeable
         final String localOurPath = ourPath.get();
         List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
         int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+
+        log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
+
         if ( ourIndex < 0 )
         {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -604,7 +629,7 @@ public class LeaderLatch implements Closeable
                 @Override
                 public void process(WatchedEvent event)
                 {
-                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+                    if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted )
                     {
                         try
                         {
@@ -626,8 +651,8 @@ public class LeaderLatch implements Closeable
                 {
                     if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                     {
-                        // previous node is gone - reset
-                        reset();
+                        // previous node is gone - retry getChildren
+                        getChildren();
                     }
                 }
             };
@@ -669,7 +694,7 @@ public class LeaderLatch implements Closeable
                 {
                     if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                     {
-                        reset();
+                        getChildren();
                     }
                 }
                 catch ( Exception e )
@@ -717,6 +742,7 @@ public class LeaderLatch implements Closeable
     private void setNode(String newValue) throws Exception
     {
         String oldPath = ourPath.getAndSet(newValue);
+        log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
         if ( oldPath != null )
         {
             client.delete().guaranteed().inBackground().forPath(oldPath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d64e7cfe..671e3c4b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,6 +29,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
@@ -46,6 +47,7 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
@@ -218,6 +220,62 @@ public class TestLeaderLatch extends BaseClassForTests
         }
     }
 
+    @Test
+    public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
+    {
+        final String latchPath = "/foo/bar";
+        final Timing2 timing = new Timing2();
+        final Duration pollInterval = Duration.ofMillis(100);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+        {
+            client.start();
+            LeaderLatch latchInitialLeader = new LeaderLatch(client, latchPath, "initial-leader");
+            LeaderLatch latchCandidate0 = new LeaderLatch(client, latchPath, "candidate-0");
+            LeaderLatch latchCandidate1 = new LeaderLatch(client, latchPath, "candidate-1");
+
+            try
+            {
+                latchInitialLeader.start();
+
+                // we want to make sure that the leader gets leadership before other instances are going to join the party
+                waitForALeader(Collections.singletonList(latchInitialLeader), new Timing());
+                // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0
+                latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1);
+                latchCandidate0.start();
+
+                final int expectedChildrenAfterCandidate0Joins = 2;
+                Awaitility.await("There should be " + expectedChildrenAfterCandidate0Joins + " child nodes created after candidate #0 joins the leader election.")
+                        .pollInterval(pollInterval)
+                        .pollInSameThread()
+                        .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate0Joins);
+                // no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0
+                latchCandidate1.start();
+
+                final int expectedChildrenAfterCandidate1Joins = 3;
+                Awaitility.await("There should be " + expectedChildrenAfterCandidate1Joins + " child nodes created after candidate #1 joins the leader election.")
+                        .pollInterval(pollInterval)
+                        .pollInSameThread()
+                        .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate1Joins);
+
+                // triggers the removal of the corresponding child node after candidate #0 retrieved the children
+                latchInitialLeader.close();
+
+                latchCandidate0.debugCheckLeaderShipLatch.countDown();
+
+                waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing());
+
+                assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership());
+            }
+            finally
+            {
+                for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1))
+                {
+                    latchToClose.closeOnDemand();
+                }
+            }
+        }
+    }
+
     @Test
     public void testSessionErrorPolicy() throws Exception
     {
@@ -248,7 +306,8 @@ public class TestLeaderLatch extends BaseClassForTests
                 client.getConnectionStateListenable().addListener(stateListener);
                 client.start();
 
-                latch = new LeaderLatch(client, "/test");
+                final String latchPatch = "/test";
+                latch = new LeaderLatch(client, latchPatch);
                 LeaderLatchListener listener = new LeaderLatchListener()
                 {
                     @Override
@@ -267,6 +326,7 @@ public class TestLeaderLatch extends BaseClassForTests
                 latch.start();
                 assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
                 assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                final List<String> beforeResetChildren = client.getChildren().forPath(latchPatch);
                 server.stop();
                 if ( isSessionIteration )
                 {
@@ -284,6 +344,8 @@ public class TestLeaderLatch extends BaseClassForTests
                     server.restart();
                     assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
                     assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                    final List<String> afterResetChildren = client.getChildren().forPath(latchPatch);
+                    assertEquals(beforeResetChildren, afterResetChildren);
                 }
             }
             finally
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 2e22ac82..088249e5 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -166,6 +166,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>