You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@curator.apache.org by GitBox <gi...@apache.org> on 2022/10/07 13:50:40 UTC

[GitHub] [curator] XComp commented on a diff in pull request #398: CURATOR-653: fix potential double leader for LeaderLatch

XComp commented on code in PR #398:
URL: https://github.com/apache/curator/pull/398#discussion_r990006956


##########
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java:
##########
@@ -540,10 +540,17 @@ public String getLastPathIsLeader()
     @VisibleForTesting
     volatile CountDownLatch debugResetWaitLatch = null;
 
+    @VisibleForTesting
+    volatile CountDownLatch debugRestWaitBeforeNodeDelete = null;

Review Comment:
   ```suggestion
       volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
   ```
   There's a typo in the name. Additionally, we might want to add `Latch` at the end to reflect the purpose of this member analogously to the other latches.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");

Review Comment:
   ```suggestion
                       assertEquals("The first LeaderLatch instance should acquire leadership.", states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
   ```
   nit: maybe adding a bit more context to this polling here to describe the test case



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };

Review Comment:
   Using the `LeaderLatch` ID in the event labels (as mentioned above) might help when evaluating the queue later on in the test. But to be fair: doing the asserts on `hasLeadership` like it's already done below (lines 303-304) serves the same purpose. I just mention it as another idea here. :shrug: 



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1
+        LeaderLatch latch1 = latches.get(0);
+        LeaderLatch latch2 = latches.get(1);
+        assertTrue(latch1.hasLeadership());
+        assertFalse(latch2.hasLeadership());
+        try {
+            latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1);
+            latch2.debugResetWaitLatch = new CountDownLatch(1);
+            latch1.debugResetWaitLatch = new CountDownLatch(1);
+
+            // force latch1 and latch2 reset
+            latch1.reset();
+            ForkJoinPool.commonPool().submit(() -> {

Review Comment:
   Should we add a comment here on why we're calling `latch2.reset()` in a separate thread? AFAIU, it's done to not make the test's thread block due to `latch2.debugRestWaitBeforeNodeDelete`. It might help readers if this is reflected in a comment here. WDYT?



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1
+        LeaderLatch latch1 = latches.get(0);
+        LeaderLatch latch2 = latches.get(1);

Review Comment:
   ```suggestion
           LeaderLatch initialLeaderLatch = latches.get(0);
           LeaderLatch initialNonLeaderLatch = latches.get(1);
   ```
   nit: maybe, making the variable more descriptive to avoid confusion. Especially because we're switching the order here in comparison to what is described in the PR description and the corresponding ticket.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();

Review Comment:
   What's the purpose of waiting here? :thinking: 



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1

Review Comment:
   nit: Moving comments into assert messages improves the test output and still works as some kind of comment. This comment could be added to the `assertTrue` and `assertFalse` in line 284-285 below describing the currently expected state.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){

Review Comment:
   Why are we hiding thrown exceptions here? Shouldn't we expose it as part of the test run if something went wrong? :thinking: The test would succeed if the Exception is thrown in this block and caught here.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());

Review Comment:
   nit: Adding the `LeaderLatch` ID (we could use for loop `i`) here might help further down in the test understanding the state of the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@curator.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org