You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2022/10/18 10:08:10 UTC
[curator] branch master updated: [CURATOR-653] Proposed changes based on PR #398 (#436)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3a1453 [CURATOR-653] Proposed changes based on PR #398 (#436)
9b3a1453 is described below
commit 9b3a145372d9a8bc9a71adb630a1ad9b71fd2889
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Tue Oct 18 12:08:03 2022 +0200
[CURATOR-653] Proposed changes based on PR #398 (#436)
Co-authored-by: shixiaoxiao <sh...@shixiaoxiao1deMacBook-Pro.local>
Co-authored-by: tison <wa...@gmail.com>
---
.../framework/recipes/leader/LeaderLatch.java | 9 +-
.../framework/recipes/leader/TestLeaderLatch.java | 151 +++++++++++++++++++++
2 files changed, 159 insertions(+), 1 deletion(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index e8187cec..553e5070 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -540,10 +540,17 @@ public class LeaderLatch implements Closeable
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;
+ @VisibleForTesting
+ volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
+
@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
+ if ( debugResetWaitBeforeNodeDeleteLatch != null )
+ {
+ debugResetWaitBeforeNodeDeleteLatch.await();
+ }
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
@@ -623,6 +630,7 @@ public class LeaderLatch implements Closeable
}
else
{
+ setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@@ -726,7 +734,6 @@ public class LeaderLatch implements Closeable
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
-
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(LeaderLatchListener::notLeader);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 671e3c4b..69deb207 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,7 +29,11 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
@@ -220,6 +224,153 @@ public class TestLeaderLatch extends BaseClassForTests
}
}
+ @Test
+ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception
+ {
+ final String latchPath = "/test";
+ final Timing2 timing = new Timing2();
+ final BlockingQueue<TestEvent> events = Queues.newLinkedBlockingQueue();
+
+ final List<Closeable> closeableResources = new ArrayList<>();
+ try
+ {
+ final String id0 = "id0";
+ final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events);
+ closeableResources.add(client0);
+ final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events);
+ closeableResources.add(latch0);
+
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ final String id1 = "id1";
+ final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events);
+ closeableResources.add(client1);
+ final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events);
+ closeableResources.add(latch1);
+
+ assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
+ // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
+ timing.forWaiting().sleepABit();
+
+ assertTrue(latch0.hasLeadership());
+ assertFalse(latch1.hasLeadership());
+
+ latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1);
+ latch1.debugResetWaitLatch = new CountDownLatch(1);
+ latch0.debugResetWaitLatch = new CountDownLatch(1);
+
+ // force latch0 and latch1 reset to trigger the actual test
+ latch0.reset();
+ // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call
+ ForkJoinPool.commonPool().submit(() -> {
+ latch1.reset();
+ return null;
+ });
+
+ // latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch
+ assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ // latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted
+ assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ assertFalse(latch0.hasLeadership());
+ assertTrue(latch1.hasLeadership());
+
+ // latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1
+ latch0.debugResetWaitLatch.countDown();
+ timing.sleepABit();
+
+ // latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch
+ latch1.debugResetWaitBeforeNodeDeleteLatch.countDown();
+
+ // latch0 receives NodeDeleteEvent and then finds itself to be the leader
+ assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ assertTrue(latch0.hasLeadership());
+
+ // latch1.reset() continues and finds itself not being the leader
+ latch1.debugResetWaitLatch.countDown();
+ // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
+ timing.forWaiting().sleepABit();
+
+ assertTrue(latch0.hasLeadership());
+ assertFalse(latch1.hasLeadership());
+ }
+ finally
+ {
+ // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
+ Collections.reverse(closeableResources);
+ closeableResources.forEach(CloseableUtils::closeQuietly);
+ }
+ }
+
+ private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection<TestEvent> events) {
+ final CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkConnectString)
+ .connectionTimeoutMs(timing.connection())
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+ .build();
+
+ client.getConnectionStateListenable().addListener((client1, newState) -> {
+ if ( newState == ConnectionState.CONNECTED )
+ {
+ events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+ }
+ });
+
+ client.start();
+
+ return client;
+ }
+
+ private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection<TestEvent> events) throws Exception
+ {
+ final LeaderLatch latch = new LeaderLatch(client, latchPath, id);
+ latch.addListener(new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP));
+ }
+
+ @Override
+ public void notLeader() {
+ events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP));
+ }
+ });
+ latch.start();
+
+ return latch;
+ }
+
+ private enum TestEventType
+ {
+ GAINED_LEADERSHIP,
+ LOST_LEADERSHIP,
+ GAINED_CONNECTION;
+ }
+
+ private static class TestEvent {
+ private final String id;
+ private final TestEventType eventType;
+
+ public TestEvent(String id, TestEventType eventType)
+ {
+ this.id = id;
+ this.eventType = eventType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TestEvent testEvent = (TestEvent) o;
+ return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
+ }
+ }
+
@Test
public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
{