You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2021/11/14 13:02:17 UTC

[curator] 01/01: `ChildrenCache` (used by Queues) didn't have a `ConnectionStateListener`. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-623
in repository https://gitbox.apache.org/repos/asf/curator.git

commit f78e6d227058f2044b8b193506d5d864f6e50fa1
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 14 13:00:51 2021 +0000

    `ChildrenCache` (used by Queues) didn't have a `ConnectionStateListener`. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.
---
 .../framework/recipes/queue/ChildrenCache.java     | 34 +++++---
 .../recipes/queue/TestLongNetworkPartition.java    | 98 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 12 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
index e5c7e8c..9e25e8d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
@@ -25,6 +25,9 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import java.io.Closeable;
@@ -33,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 class ChildrenCache implements Closeable
 {
@@ -49,7 +51,7 @@ class ChildrenCache implements Closeable
         {
             if ( !isClosed.get() )
             {
-                sync(true);
+                sync();
             }
         }
     };
@@ -66,6 +68,19 @@ class ChildrenCache implements Closeable
         }
     };
 
+    private final ConnectionStateListener connectionStateListener = (__, newState) -> {
+        if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
+            try
+            {
+                sync();
+            }
+            catch ( Exception e )
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+
     static class Data
     {
         final List<String>      children;
@@ -86,12 +101,14 @@ class ChildrenCache implements Closeable
 
     void start() throws Exception
     {
-        sync(true);
+        client.getConnectionStateListenable().addListener(connectionStateListener);
+        sync();
     }
 
     @Override
     public void close() throws IOException
     {
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
         client.removeWatchers();
         isClosed.set(true);
         notifyFromCallback();
@@ -137,16 +154,9 @@ class ChildrenCache implements Closeable
         notifyAll();
     }
 
-    private synchronized void sync(boolean watched) throws Exception
+    private synchronized void sync() throws Exception
     {
-        if ( watched )
-        {
-            client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
-        }
-        else
-        {
-            client.getChildren().inBackground(callback).forPath(path);
-        }
+        client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
     }
 
     private synchronized void setNewChildren(List<String> newChildren)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
new file mode 100644
index 0000000..2e3c7a5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.queue;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.junit.jupiter.api.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class TestLongNetworkPartition {
+    private static final Timing2 timing = new Timing2();
+
+    // test for https://issues.apache.org/jira/browse/CURATOR-623
+    @Test
+    public void testLongNetworkPartition() throws Exception {
+        final CompletableFuture<Void> done = new CompletableFuture<>();
+        try (final TestingCluster testingCluster = started(new TestingCluster(1));
+                final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+                    final DistributedQueue<String> dyingQueue = newQueue(dyingCuratorFramework, item -> {
+            if ( item.equals("0") )
+            {
+                done.complete(null);
+            }
+        }))
+        {
+            dyingQueue.start();
+            testingCluster.killServer(testingCluster.getInstances().iterator().next());
+            timing.forSessionSleep().multiple(2).sleep();
+            testingCluster.restartServer(testingCluster.getInstances().iterator().next());
+            try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+                    final DistributedQueue<String> aliveQueue = newQueue(aliveCuratorFramework, null))
+            {
+                aliveQueue.start();
+                aliveQueue.put("0");
+                done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
+        curatorFramework.start();
+        return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer<String>() {
+            @Override
+            public void consumeMessage(String o) {
+                consumer.accept(o);
+            }
+
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+            }
+        }, new QueueSerializer<String>() {
+            @Override
+            public byte[] serialize(String item) {
+                return item.getBytes();
+            }
+
+            @Override
+            public String deserialize(byte[] bytes) {
+                return new String(bytes);
+            }
+        }, "/MyChildrenCacheTest/queue").buildQueue();
+    }
+
+    private static TestingCluster started(TestingCluster testingCluster) throws Exception {
+        testingCluster.start();
+        return testingCluster;
+    }
+
+    private static CuratorFramework getCuratorFramework(String connectString) {
+        return CuratorFrameworkFactory.builder()
+            .ensembleProvider(new FixedEnsembleProvider(connectString, true))
+            .sessionTimeoutMs(timing.session())
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+    }
+}
\ No newline at end of file