You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/04/02 13:17:04 UTC

[curator] 01/02: CURATOR-525

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-525-fix-lost-state-race
in repository https://gitbox.apache.org/repos/asf/curator.git

commit ba83de17ea9c99ff5fb73a8efa8d939ceda0d1f9
Author: randgalt <ra...@apache.org>
AuthorDate: Tue Mar 31 20:22:43 2020 -0500

    CURATOR-525
    
    There is a race whereby the ZooKeeper connection can be healed before Curator is finished processing the new connection state. When this happens
    the Curator instance becomes a Zombie stuck in the LOST state. This fix is a "hack". ConnectionStateManager will notice that the connection state is
    LOST but that the Curator instance reports that it is connected. When this happens, it is logged and the connection is reset.
---
 .../framework/imps/CuratorFrameworkImpl.java       | 27 ++++++++++++++++
 .../framework/state/ConnectionStateManager.java    | 18 +++++++++++
 .../curator/framework/imps/TestFrameworkEdges.java | 36 ++++++++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bfe61bf..1abfc28 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -816,6 +816,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return ensembleTracker;
     }
 
+    @VisibleForTesting
+    volatile CountDownLatch debugCheckBackgroundRetryLatch;
+    @VisibleForTesting
+    volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
+    @VisibleForTesting
+    volatile KeeperException.Code injectedCode;
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
@@ -851,6 +858,26 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 e = new Exception("Unknown result codegetResultCode()");
             }
 
+            if ( debugCheckBackgroundRetryLatch != null )       // scaffolding to test CURATOR-525
+            {
+                if ( debugCheckBackgroundRetryReadyLatch != null )
+                {
+                    debugCheckBackgroundRetryReadyLatch.countDown();
+                }
+                try
+                {
+                    debugCheckBackgroundRetryLatch.await();
+                    if (injectedCode != null)
+                    {
+                        code = injectedCode;
+                    }
+                }
+                catch ( InterruptedException ex )
+                {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
             validateConnection(codeToState(code));
             logError("Background operation retry gave up", e);
         }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 7285431..32ddb78 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -285,6 +285,24 @@ public class ConnectionStateManager implements Closeable
                         checkSessionExpiration();
                     }
                 }
+
+                synchronized(this)
+                {
+                    if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() )
+                    {
+                        // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired
+                        // this "hack" fixes it by resetting the connection
+                        log.warn("ConnectionState is LOST but isConnected() is true. Resetting connection.");
+                        try
+                        {
+                            client.getZookeeperClient().reset();
+                        }
+                        catch ( Exception e )
+                        {
+                            log.error("Could not reset connection after LOST/isConnected mismatch");
+                        }
+                    }
+                }
             }
             catch ( InterruptedException e )
             {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 5a7c415..6fcd553 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -75,6 +75,42 @@ public class TestFrameworkEdges extends BaseClassForTests
         System.setProperty("zookeeper.extendedTypesEnabled", "true");
     }
 
+    @Test(description = "test case for CURATOR-525")
+    public void testValidateConnectionEventRaces() throws Exception
+    {
+        // test for CURATOR-525 - there is a race whereby Curator can go to LOST
+        // after the connection has been repaired. Prior to the fix, the Curator
+        // instance would become a zombie, never leaving the LOST state
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new RetryOneTime(1)))
+        {
+            CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client;
+
+            client.start();
+            client.getChildren().forPath("/");
+            client.create().forPath("/foo");
+
+            BlockingQueue<ConnectionState> stateQueue = new LinkedBlockingQueue<>();
+            client.getConnectionStateListenable().addListener((__, newState) -> stateQueue.add(newState));
+
+            server.stop();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.SUSPENDED);
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+            clientImpl.debugCheckBackgroundRetryReadyLatch = new CountDownLatch(1);
+            clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1);
+
+            client.delete().guaranteed().inBackground().forPath("/foo");
+            timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch);
+            server.restart();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+            clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED;  // simulate an expiration being handled after the connection is repaired
+            clientImpl.debugCheckBackgroundRetryLatch.countDown();
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+            Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+        }
+    }
+
     @Test
     public void testInjectSessionExpiration() throws Exception
     {