You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/04/02 13:17:04 UTC
[curator] 01/02: CURATOR-525
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-525-fix-lost-state-race
in repository https://gitbox.apache.org/repos/asf/curator.git
commit ba83de17ea9c99ff5fb73a8efa8d939ceda0d1f9
Author: randgalt <ra...@apache.org>
AuthorDate: Tue Mar 31 20:22:43 2020 -0500
CURATOR-525
There is a race whereby the ZooKeeper connection can be healed before Curator is finished processing the new connection state. When this happens
the Curator instance becomes a Zombie stuck in the LOST state. This fix is a "hack". ConnectionStateManager will notice that the connection state is
LOST but that the Curator instance reports that it is connected. When this happens, it is logged and the connection is reset.
---
.../framework/imps/CuratorFrameworkImpl.java | 27 ++++++++++++++++
.../framework/state/ConnectionStateManager.java | 18 +++++++++++
.../curator/framework/imps/TestFrameworkEdges.java | 36 ++++++++++++++++++++++
3 files changed, 81 insertions(+)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bfe61bf..1abfc28 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -816,6 +816,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
return ensembleTracker;
}
+ @VisibleForTesting
+ volatile CountDownLatch debugCheckBackgroundRetryLatch;
+ @VisibleForTesting
+ volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
+ @VisibleForTesting
+ volatile KeeperException.Code injectedCode;
+
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
@@ -851,6 +858,26 @@ public class CuratorFrameworkImpl implements CuratorFramework
e = new Exception("Unknown result codegetResultCode()");
}
+ if ( debugCheckBackgroundRetryLatch != null ) // scaffolding to test CURATOR-525
+ {
+ if ( debugCheckBackgroundRetryReadyLatch != null )
+ {
+ debugCheckBackgroundRetryReadyLatch.countDown();
+ }
+ try
+ {
+ debugCheckBackgroundRetryLatch.await();
+ if (injectedCode != null)
+ {
+ code = injectedCode;
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
validateConnection(codeToState(code));
logError("Background operation retry gave up", e);
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 7285431..32ddb78 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -285,6 +285,24 @@ public class ConnectionStateManager implements Closeable
checkSessionExpiration();
}
}
+
+ synchronized(this)
+ {
+ if ( (currentConnectionState == ConnectionState.LOST) && client.getZookeeperClient().isConnected() )
+ {
+ // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired
+ // this "hack" fixes it by resetting the connection
+ log.warn("ConnectionState is LOST but isConnected() is true. Resetting connection.");
+ try
+ {
+ client.getZookeeperClient().reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset connection after LOST/isConnected mismatch");
+ }
+ }
+ }
}
catch ( InterruptedException e )
{
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 5a7c415..6fcd553 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -75,6 +75,42 @@ public class TestFrameworkEdges extends BaseClassForTests
System.setProperty("zookeeper.extendedTypesEnabled", "true");
}
+ @Test(description = "test case for CURATOR-525")
+ public void testValidateConnectionEventRaces() throws Exception
+ {
+ // test for CURATOR-525 - there is a race whereby Curator can go to LOST
+ // after the connection has been repaired. Prior to the fix, the Curator
+ // instance would become a zombie, never leaving the LOST state
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new RetryOneTime(1)))
+ {
+ CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client;
+
+ client.start();
+ client.getChildren().forPath("/");
+ client.create().forPath("/foo");
+
+ BlockingQueue<ConnectionState> stateQueue = new LinkedBlockingQueue<>();
+ client.getConnectionStateListenable().addListener((__, newState) -> stateQueue.add(newState));
+
+ server.stop();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.SUSPENDED);
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+ clientImpl.debugCheckBackgroundRetryReadyLatch = new CountDownLatch(1);
+ clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1);
+
+ client.delete().guaranteed().inBackground().forPath("/foo");
+ timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch);
+ server.restart();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+ clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED; // simulate an expiration being handled after the connection is repaired
+ clientImpl.debugCheckBackgroundRetryLatch.countDown();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+ }
+ }
+
@Test
public void testInjectSessionExpiration() throws Exception
{