You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2021/11/14 13:02:17 UTC
[curator] 01/01: `ChildrenCache` (used by Queues) didn't have a `ConnectionStateListener`. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-623
in repository https://gitbox.apache.org/repos/asf/curator.git
commit f78e6d227058f2044b8b193506d5d864f6e50fa1
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 14 13:00:51 2021 +0000
`ChildrenCache` (used by Queues) didn't have a `ConnectionStateListener`. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.
---
.../framework/recipes/queue/ChildrenCache.java | 34 +++++---
.../recipes/queue/TestLongNetworkPartition.java | 98 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 12 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
index e5c7e8c..9e25e8d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
@@ -25,6 +25,9 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import java.io.Closeable;
@@ -33,7 +36,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
class ChildrenCache implements Closeable
{
@@ -49,7 +51,7 @@ class ChildrenCache implements Closeable
{
if ( !isClosed.get() )
{
- sync(true);
+ sync();
}
}
};
@@ -66,6 +68,19 @@ class ChildrenCache implements Closeable
}
};
+ private final ConnectionStateListener connectionStateListener = (__, newState) -> {
+ if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
+ try
+ {
+ sync();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
static class Data
{
final List<String> children;
@@ -86,12 +101,14 @@ class ChildrenCache implements Closeable
void start() throws Exception
{
- sync(true);
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ sync();
}
@Override
public void close() throws IOException
{
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
client.removeWatchers();
isClosed.set(true);
notifyFromCallback();
@@ -137,16 +154,9 @@ class ChildrenCache implements Closeable
notifyAll();
}
- private synchronized void sync(boolean watched) throws Exception
+ private synchronized void sync() throws Exception
{
- if ( watched )
- {
- client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
- }
- else
- {
- client.getChildren().inBackground(callback).forPath(path);
- }
+ client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
}
private synchronized void setNewChildren(List<String> newChildren)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
new file mode 100644
index 0000000..2e3c7a5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.queue;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.junit.jupiter.api.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class TestLongNetworkPartition {
+ private static final Timing2 timing = new Timing2();
+
+ // test for https://issues.apache.org/jira/browse/CURATOR-623
+ @Test
+ public void testLongNetworkPartition() throws Exception {
+ final CompletableFuture<Void> done = new CompletableFuture<>();
+ try (final TestingCluster testingCluster = started(new TestingCluster(1));
+ final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+ final DistributedQueue<String> dyingQueue = newQueue(dyingCuratorFramework, item -> {
+ if ( item.equals("0") )
+ {
+ done.complete(null);
+ }
+ }))
+ {
+ dyingQueue.start();
+ testingCluster.killServer(testingCluster.getInstances().iterator().next());
+ timing.forSessionSleep().multiple(2).sleep();
+ testingCluster.restartServer(testingCluster.getInstances().iterator().next());
+ try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+ final DistributedQueue<String> aliveQueue = newQueue(aliveCuratorFramework, null))
+ {
+ aliveQueue.start();
+ aliveQueue.put("0");
+ done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
+ curatorFramework.start();
+ return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer<String>() {
+ @Override
+ public void consumeMessage(String o) {
+ consumer.accept(o);
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ }
+ }, new QueueSerializer<String>() {
+ @Override
+ public byte[] serialize(String item) {
+ return item.getBytes();
+ }
+
+ @Override
+ public String deserialize(byte[] bytes) {
+ return new String(bytes);
+ }
+ }, "/MyChildrenCacheTest/queue").buildQueue();
+ }
+
+ private static TestingCluster started(TestingCluster testingCluster) throws Exception {
+ testingCluster.start();
+ return testingCluster;
+ }
+
+ private static CuratorFramework getCuratorFramework(String connectString) {
+ return CuratorFrameworkFactory.builder()
+ .ensembleProvider(new FixedEnsembleProvider(connectString, true))
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ }
+}
\ No newline at end of file