You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/04/01 01:24:25 UTC
[curator] 01/01: CURATOR-525
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-525-fix-lost-state-race
in repository https://gitbox.apache.org/repos/asf/curator.git
commit a1a6d62d82adf3d3f561aa324968bb8d46b00e92
Author: randgalt <ra...@apache.org>
AuthorDate: Tue Mar 31 20:22:43 2020 -0500
CURATOR-525
There is a race whereby the ZooKeeper connection can be healed before Curator is finished processing the new connection state. When this happens
the Curator instance becomes a Zombie stuck in the LOST state. This fix is a "hack". ConnectionStateManager will notice that the connection state is
LOST but that the Curator instance reports that it is connected. When this happens, it is logged and the connection is reset.
---
.../framework/imps/CuratorFrameworkImpl.java | 27 ++++++++++++++++
.../framework/state/ConnectionStateManager.java | 14 +++++++--
.../curator/framework/imps/TestFrameworkEdges.java | 36 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 2 deletions(-)
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bfe61bf..1abfc28 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -816,6 +816,13 @@ public class CuratorFrameworkImpl implements CuratorFramework
return ensembleTracker;
}
+ @VisibleForTesting
+ volatile CountDownLatch debugCheckBackgroundRetryLatch;
+ @VisibleForTesting
+ volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
+ @VisibleForTesting
+ volatile KeeperException.Code injectedCode;
+
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
@@ -851,6 +858,26 @@ public class CuratorFrameworkImpl implements CuratorFramework
e = new Exception("Unknown result codegetResultCode()");
}
+ if ( debugCheckBackgroundRetryLatch != null ) // scaffolding to test CURATOR-525
+ {
+ if ( debugCheckBackgroundRetryReadyLatch != null )
+ {
+ debugCheckBackgroundRetryReadyLatch.countDown();
+ }
+ try
+ {
+ debugCheckBackgroundRetryLatch.await();
+ if (injectedCode != null)
+ {
+ code = injectedCode;
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
validateConnection(codeToState(code));
logError("Background operation retry gave up", e);
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 7285431..e86ebab 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -319,8 +319,18 @@ public class ConnectionStateManager implements Closeable
{
try
{
- // give ConnectionState.checkTimeouts() a chance to run, reset ensemble providers, etc.
- client.getZookeeperClient().getZooKeeper();
+ if ( client.getZookeeperClient().isConnected() )
+ {
+ // CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired
+ // this "hack" fixes it by resetting the connection
+ log.warn("ConnectionState is LOST but isConnected() is true. Resetting connection.");
+ client.getZookeeperClient().reset();
+ }
+ else
+ {
+ // give ConnectionState.checkTimeouts() a chance to run, reset ensemble providers, etc.
+ client.getZookeeperClient().getZooKeeper();
+ }
}
catch ( Exception e )
{
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 5a7c415..6fcd553 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -75,6 +75,42 @@ public class TestFrameworkEdges extends BaseClassForTests
System.setProperty("zookeeper.extendedTypesEnabled", "true");
}
+ @Test(description = "test case for CURATOR-525")
+ public void testValidateConnectionEventRaces() throws Exception
+ {
+ // test for CURATOR-525 - there is a race whereby Curator can go to LOST
+ // after the connection has been repaired. Prior to the fix, the Curator
+ // instance would become a zombie, never leaving the LOST state
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 2000, 1000, new RetryOneTime(1)))
+ {
+ CuratorFrameworkImpl clientImpl = (CuratorFrameworkImpl)client;
+
+ client.start();
+ client.getChildren().forPath("/");
+ client.create().forPath("/foo");
+
+ BlockingQueue<ConnectionState> stateQueue = new LinkedBlockingQueue<>();
+ client.getConnectionStateListenable().addListener((__, newState) -> stateQueue.add(newState));
+
+ server.stop();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.SUSPENDED);
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+ clientImpl.debugCheckBackgroundRetryReadyLatch = new CountDownLatch(1);
+ clientImpl.debugCheckBackgroundRetryLatch = new CountDownLatch(1);
+
+ client.delete().guaranteed().inBackground().forPath("/foo");
+ timing.awaitLatch(clientImpl.debugCheckBackgroundRetryReadyLatch);
+ server.restart();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+ clientImpl.injectedCode = KeeperException.Code.SESSIONEXPIRED; // simulate an expiration being handled after the connection is repaired
+ clientImpl.debugCheckBackgroundRetryLatch.countDown();
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.LOST);
+
+ Assert.assertEquals(timing.takeFromQueue(stateQueue), ConnectionState.RECONNECTED);
+ }
+ }
+
@Test
public void testInjectSessionExpiration() throws Exception
{