You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2022/09/27 03:01:35 UTC
[curator] branch master updated: CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 4b96f18b CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)
4b96f18b is described below
commit 4b96f18b311485d006f65c1462d2cdd0b5838c84
Author: tison <wa...@gmail.com>
AuthorDate: Tue Sep 27 11:01:28 2022 +0800
CURATOR-644. CURATOR-645. Fix livelock in LeaderLatch (#430)
Signed-off-by: tison <wa...@gmail.com>
Co-authored-by: Matthias Pohl <ma...@aiven.io>
---
curator-recipes/pom.xml | 6 ++
.../framework/recipes/leader/LeaderLatch.java | 38 +++++++++++--
.../framework/recipes/leader/TestLeaderLatch.java | 64 +++++++++++++++++++++-
curator-test-zk35/pom.xml | 6 ++
4 files changed, 107 insertions(+), 7 deletions(-)
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index d27d7fa3..b84a9489 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -86,6 +86,12 @@
<artifactId>commons-math</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 5d1c249b..e8187cec 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -190,6 +190,12 @@ public class LeaderLatch implements Closeable
close(closeMode);
}
+ @VisibleForTesting
+ void closeOnDemand() throws IOException
+ {
+ internalClose(closeMode, false);
+ }
+
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
@@ -198,9 +204,25 @@ public class LeaderLatch implements Closeable
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
- public synchronized void close(CloseMode closeMode) throws IOException
+ public void close(CloseMode closeMode) throws IOException
{
- Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+ internalClose(closeMode, true);
+ }
+
+ private synchronized void internalClose(CloseMode closeMode, boolean failOnClosed) throws IOException
+ {
+ if (!state.compareAndSet(State.STARTED, State.CLOSED))
+ {
+ if (failOnClosed)
+ {
+ throw new IllegalStateException("Already closed or has not been started");
+ }
+ else
+ {
+ return;
+ }
+ }
+
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
cancelStartTask();
@@ -586,6 +608,9 @@ public class LeaderLatch implements Closeable
final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+
+ log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
+
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -604,7 +629,7 @@ public class LeaderLatch implements Closeable
@Override
public void process(WatchedEvent event)
{
- if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+ if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted )
{
try
{
@@ -626,8 +651,8 @@ public class LeaderLatch implements Closeable
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
- // previous node is gone - reset
- reset();
+ // previous node is gone - retry getChildren
+ getChildren();
}
}
};
@@ -669,7 +694,7 @@ public class LeaderLatch implements Closeable
{
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
{
- reset();
+ getChildren();
}
}
catch ( Exception e )
@@ -717,6 +742,7 @@ public class LeaderLatch implements Closeable
private void setNode(String newValue) throws Exception
{
String oldPath = ourPath.getAndSet(newValue);
+ log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d64e7cfe..671e3c4b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,6 +29,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
@@ -46,6 +47,7 @@ import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -218,6 +220,62 @@ public class TestLeaderLatch extends BaseClassForTests
}
}
+ @Test
+ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
+ {
+ final String latchPath = "/foo/bar";
+ final Timing2 timing = new Timing2();
+ final Duration pollInterval = Duration.ofMillis(100);
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ LeaderLatch latchInitialLeader = new LeaderLatch(client, latchPath, "initial-leader");
+ LeaderLatch latchCandidate0 = new LeaderLatch(client, latchPath, "candidate-0");
+ LeaderLatch latchCandidate1 = new LeaderLatch(client, latchPath, "candidate-1");
+
+ try
+ {
+ latchInitialLeader.start();
+
+ // we want to make sure that the leader gets leadership before other instances are going to join the party
+ waitForALeader(Collections.singletonList(latchInitialLeader), new Timing());
+ // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0
+ latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1);
+ latchCandidate0.start();
+
+ final int expectedChildrenAfterCandidate0Joins = 2;
+ Awaitility.await("There should be " + expectedChildrenAfterCandidate0Joins + " child nodes created after candidate #0 joins the leader election.")
+ .pollInterval(pollInterval)
+ .pollInSameThread()
+ .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate0Joins);
+ // no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0
+ latchCandidate1.start();
+
+ final int expectedChildrenAfterCandidate1Joins = 3;
+ Awaitility.await("There should be " + expectedChildrenAfterCandidate1Joins + " child nodes created after candidate #1 joins the leader election.")
+ .pollInterval(pollInterval)
+ .pollInSameThread()
+ .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate1Joins);
+
+ // triggers the removal of the corresponding child node after candidate #0 retrieved the children
+ latchInitialLeader.close();
+
+ latchCandidate0.debugCheckLeaderShipLatch.countDown();
+
+ waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing());
+
+ assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership());
+ }
+ finally
+ {
+ for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1))
+ {
+ latchToClose.closeOnDemand();
+ }
+ }
+ }
+ }
+
@Test
public void testSessionErrorPolicy() throws Exception
{
@@ -248,7 +306,8 @@ public class TestLeaderLatch extends BaseClassForTests
client.getConnectionStateListenable().addListener(stateListener);
client.start();
- latch = new LeaderLatch(client, "/test");
+ final String latchPatch = "/test";
+ latch = new LeaderLatch(client, latchPatch);
LeaderLatchListener listener = new LeaderLatchListener()
{
@Override
@@ -267,6 +326,7 @@ public class TestLeaderLatch extends BaseClassForTests
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ final List<String> beforeResetChildren = client.getChildren().forPath(latchPatch);
server.stop();
if ( isSessionIteration )
{
@@ -284,6 +344,8 @@ public class TestLeaderLatch extends BaseClassForTests
server.restart();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ final List<String> afterResetChildren = client.getChildren().forPath(latchPatch);
+ assertEquals(beforeResetChildren, afterResetChildren);
}
}
finally
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 2e22ac82..088249e5 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -166,6 +166,12 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>