You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2022/10/18 10:08:10 UTC

[curator] branch master updated: [CURATOR-653] Proposed changes based on PR #398 (#436)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b3a1453 [CURATOR-653] Proposed changes based on PR #398 (#436)
9b3a1453 is described below

commit 9b3a145372d9a8bc9a71adb630a1ad9b71fd2889
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Tue Oct 18 12:08:03 2022 +0200

    [CURATOR-653] Proposed changes based on PR #398 (#436)
    
    Co-authored-by: shixiaoxiao <sh...@shixiaoxiao1deMacBook-Pro.local>
    Co-authored-by: tison <wa...@gmail.com>
---
 .../framework/recipes/leader/LeaderLatch.java      |   9 +-
 .../framework/recipes/leader/TestLeaderLatch.java  | 151 +++++++++++++++++++++
 2 files changed, 159 insertions(+), 1 deletion(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index e8187cec..553e5070 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -540,10 +540,17 @@ public class LeaderLatch implements Closeable
     @VisibleForTesting
     volatile CountDownLatch debugResetWaitLatch = null;
 
+    @VisibleForTesting
+    volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
+
     @VisibleForTesting
     void reset() throws Exception
     {
         setLeadership(false);
+        if ( debugResetWaitBeforeNodeDeleteLatch != null )
+        {
+            debugResetWaitBeforeNodeDeleteLatch.await();
+        }
         setNode(null);
 
         BackgroundCallback callback = new BackgroundCallback()
@@ -623,6 +630,7 @@ public class LeaderLatch implements Closeable
         }
         else
         {
+            setLeadership(false);
             String watchPath = sortedChildren.get(ourIndex - 1);
             Watcher watcher = new Watcher()
             {
@@ -726,7 +734,6 @@ public class LeaderLatch implements Closeable
     private synchronized void setLeadership(boolean newValue)
     {
         boolean oldValue = hasLeadership.getAndSet(newValue);
-
         if ( oldValue && !newValue )
         { // Lost leadership, was true, now false
             listeners.forEach(LeaderLatchListener::notLeader);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 671e3c4b..69deb207 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,7 +29,11 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
@@ -220,6 +224,153 @@ public class TestLeaderLatch extends BaseClassForTests
         }
     }
 
+    @Test
+    public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception
+    {
+        final String latchPath = "/test";
+        final Timing2 timing = new Timing2();
+        final BlockingQueue<TestEvent> events = Queues.newLinkedBlockingQueue();
+
+        final List<Closeable> closeableResources = new ArrayList<>();
+        try
+        {
+            final String id0 = "id0";
+            final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events);
+            closeableResources.add(client0);
+            final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events);
+            closeableResources.add(latch0);
+
+            assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+            final String id1 = "id1";
+            final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events);
+            closeableResources.add(client1);
+            final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events);
+            closeableResources.add(latch1);
+
+            assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+            // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
+            // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
+            timing.forWaiting().sleepABit();
+
+            assertTrue(latch0.hasLeadership());
+            assertFalse(latch1.hasLeadership());
+
+            latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1);
+            latch1.debugResetWaitLatch = new CountDownLatch(1);
+            latch0.debugResetWaitLatch = new CountDownLatch(1);
+
+            // force latch0 and latch1 reset to trigger the actual test
+            latch0.reset();
+            // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call
+            ForkJoinPool.commonPool().submit(() -> {
+                latch1.reset();
+                return null;
+            });
+
+            // latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch
+            assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            // latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted
+            assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+            assertFalse(latch0.hasLeadership());
+            assertTrue(latch1.hasLeadership());
+
+            // latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1
+            latch0.debugResetWaitLatch.countDown();
+            timing.sleepABit();
+
+            // latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch
+            latch1.debugResetWaitBeforeNodeDeleteLatch.countDown();
+
+            // latch0 receives NodeDeleteEvent and then finds itself to be the leader
+            assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            assertTrue(latch0.hasLeadership());
+
+            // latch1.reset() continues and finds itself not being the leader
+            latch1.debugResetWaitLatch.countDown();
+            // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
+            timing.forWaiting().sleepABit();
+
+            assertTrue(latch0.hasLeadership());
+            assertFalse(latch1.hasLeadership());
+        }
+        finally
+        {
+            // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
+            Collections.reverse(closeableResources);
+            closeableResources.forEach(CloseableUtils::closeQuietly);
+        }
+    }
+
+    private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection<TestEvent> events) {
+        final CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(zkConnectString)
+            .connectionTimeoutMs(timing.connection())
+            .sessionTimeoutMs(timing.session())
+            .retryPolicy(new RetryOneTime(1))
+            .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+            .build();
+
+        client.getConnectionStateListenable().addListener((client1, newState) -> {
+            if ( newState == ConnectionState.CONNECTED )
+            {
+                events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+            }
+        });
+
+        client.start();
+
+        return client;
+    }
+
+    private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection<TestEvent> events) throws Exception
+    {
+        final LeaderLatch latch = new LeaderLatch(client, latchPath, id);
+        latch.addListener(new LeaderLatchListener() {
+            @Override
+            public void isLeader() {
+                events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP));
+            }
+
+            @Override
+            public void notLeader() {
+                events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP));
+            }
+        });
+        latch.start();
+
+        return latch;
+    }
+
+    private enum TestEventType
+    {
+        GAINED_LEADERSHIP,
+        LOST_LEADERSHIP,
+        GAINED_CONNECTION;
+    }
+
+    private static class TestEvent {
+        private final String id;
+        private final TestEventType eventType;
+
+        public TestEvent(String id, TestEventType eventType)
+        {
+            this.id = id;
+            this.eventType = eventType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            TestEvent testEvent = (TestEvent) o;
+            return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
+        }
+    }
+
     @Test
     public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
     {