You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/10/07 12:49:10 UTC
[curator] 01/01: wip
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch persistent-watcher-cache
in repository https://gitbox.apache.org/repos/asf/curator.git
commit c2590fcbf581d2ec1c8e4a329c865ece821972e6
Author: randgalt <ra...@apache.org>
AuthorDate: Mon Oct 7 15:48:57 2019 +0300
wip
---
.../framework/recipes/cache/CuratorCache.java | 132 +++++++++
.../framework/recipes/cache/CuratorCacheImpl.java | 249 +++++++++++++++++
.../recipes/cache/CuratorCacheListener.java | 217 +++++++++++++++
.../recipes/cache/CuratorCacheStorage.java | 56 ++++
.../curator/framework/recipes/cache/NodeCache.java | 3 +
.../recipes/cache/NodeCacheListenerWrapper.java | 41 +++
.../framework/recipes/cache/PathChildrenCache.java | 3 +
.../cache/PathChildrenCacheListenerWrapper.java | 46 ++++
.../recipes/cache/StandardCuratorCacheStorage.java | 98 +++++++
.../curator/framework/recipes/cache/TreeCache.java | 3 +
.../recipes/cache/TreeCacheListenerWrapper.java | 46 ++++
.../framework/recipes/watch/PersistentWatcher.java | 39 ++-
.../framework/recipes/cache/TestCuratorCache.java | 93 +++++++
.../recipes/cache/TestCuratorCacheConsistency.java | 301 +++++++++++++++++++++
.../recipes/cache/TestWrappedNodeCache.java | 147 ++++++++++
.../recipes/watch/TestPersistentWatcher.java | 20 ++
.../org/apache/curator/test/TestingCluster.java | 2 +-
.../x/async/api/AsyncPersistentWatchBuilder.java | 18 ++
.../details/AsyncPersistentWatchBuilderImpl.java | 75 -----
19 files changed, 1511 insertions(+), 78 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ef8d290
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Optional;
+
+/**
+ * <p>
+ * A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ * tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ * down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ * be prepared for false-positives and false-negatives. Additionally, always use the version number
+ * when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+ /**
+ * cache build options
+ */
+ enum Options
+ {
+ /**
+ * Cache the entire tree of nodes starting at the given node
+ */
+ RECURSIVE,
+
+ /**
+ * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+ */
+ COMPRESSED_DATA
+ }
+
+ /**
+ * Return a Curator cache for the given path with the given options using a standard storage instance
+ *
+ * @param client Curator client
+ * @param path path to cache
+ * @param options any options
+ * @return cache (note it must be started via {@link #start()}
+ */
+ static CuratorCache build(CuratorFramework client, String path, Options... options)
+ {
+ return build(client, CuratorCacheStorage.standard(), path, options);
+ }
+
+ /**
+ * Return a Curator cache for the given path with the given options and the given storage instance
+ *
+ * @param client Curator client
+ * @param storage storage to use
+ * @param path path to cache
+ * @param options any options
+ * @return cache (note it must be started via {@link #start()}
+ */
+ static CuratorCache build(CuratorFramework client, CuratorCacheStorage storage, String path, Options... options)
+ {
+ return new CuratorCacheImpl(client, storage, path, options);
+ }
+
+ /**
+ * Start the cache. This will cause a complete refresh from the cache's root node and generate
+ * events for all nodes found, etc.
+ */
+ void start();
+
+ /**
+ * Close the cache, stop responding to events, etc. Note: also calls {@link CuratorCacheStorage#close()}
+ */
+ @Override
+ void close();
+
+ /**
+ * Utility to force a rebuild of the cache. Normally, this should not ever be needed
+ */
+ void forceRebuild();
+
+ /**
+ * Return the storage instance being used
+ *
+ * @return storage
+ */
+ CuratorCacheStorage storage();
+
+ /**
+ * Return the root node being cached (i.e. the node passed to the builder)
+ *
+ * @return root node path
+ */
+ String getRootPath();
+
+ /**
+ * Convenience to return the root node data
+ *
+ * @return data (if it's in the cache)
+ */
+ default Optional<ChildData> getRootData()
+ {
+ return storage().get(getRootPath());
+ }
+
+ /**
+ * Return the listener container so that listeners can be registered to be notified of changes to the cache
+ *
+ * @return listener container
+ */
+ Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..ef37ebe
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final PersistentWatcher persistentWatcher;
+ private final CuratorFramework client;
+ private final CuratorCacheStorage storage;
+ private final String path;
+ private final boolean recursive;
+ private final boolean compressedData;
+ private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options... optionsArg)
+ {
+ Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+ this.client = client;
+ this.storage = storage;
+ this.path = path;
+ this.recursive = options.contains(Options.RECURSIVE);
+ this.compressedData = options.contains(Options.COMPRESSED_DATA);
+ persistentWatcher = new PersistentWatcher(client, path, recursive);
+ persistentWatcher.getListenable().addListener(this::processEvent);
+ persistentWatcher.getResetListenable().addListener(this::forceRebuild);
+ }
+
+ @Override
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ persistentWatcher.start();
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ persistentWatcher.close();
+ storage.close();
+ }
+ }
+
+ @Override
+ public void forceRebuild()
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ nodeChanged(path);
+ storage.stream()
+ .map(ChildData::getPath)
+ .filter(p -> !p.equals(path))
+ .forEach(this::nodeChanged);
+ }
+
+ @Override
+ public String getRootPath()
+ {
+ return path;
+ }
+
+ @Override
+ public CuratorCacheStorage storage()
+ {
+ return storage;
+ }
+
+ @Override
+ public Listenable<CuratorCacheListener> listenable()
+ {
+ return listenerManager;
+ }
+
+ protected void processEvent(WatchedEvent event)
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ switch ( event.getType() )
+ {
+ case NodeDataChanged:
+ case NodeCreated:
+ {
+ nodeChanged(event.getPath());
+ break;
+ }
+
+ case NodeDeleted:
+ {
+ removeStorage(event.getPath());
+ break;
+ }
+
+ case NodeChildrenChanged:
+ {
+ nodeChildrenChanged(event.getPath());
+ break;
+ }
+ }
+ }
+
+ private void nodeChildrenChanged(String fromPath)
+ {
+ if ( (state.get() != State.STARTED) || !recursive )
+ {
+ return;
+ }
+
+ try
+ {
+ BackgroundCallback callback = (__, event) -> {
+ if ( event.getResultCode() == OK.intValue() )
+ {
+ event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+ }
+ else if ( event.getResultCode() == NONODE.intValue() )
+ {
+ removeStorage(event.getPath());
+ }
+ else
+ {
+ System.err.println(event); // TODO
+ }
+ };
+ client.getChildren().inBackground(callback).forPath(fromPath);
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace(); // TODO
+ }
+ }
+
+ private void nodeChanged(String fromPath)
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ try
+ {
+ BackgroundCallback callback = (__, event) -> {
+ if ( event.getResultCode() == OK.intValue() )
+ {
+ putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+ nodeChildrenChanged(event.getPath());
+ }
+ else if ( event.getResultCode() == NONODE.intValue() )
+ {
+ removeStorage(event.getPath());
+ }
+ else
+ {
+ System.err.println(event); // TODO
+ }
+ };
+ if ( compressedData )
+ {
+ client.getData().decompressed().inBackground(callback).forPath(fromPath);
+ }
+ else
+ {
+ client.getData().inBackground(callback).forPath(fromPath);
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace(); // TODO
+ // TODO
+ }
+ }
+
+ private void putStorage(ChildData data)
+ {
+ Optional<ChildData> previousData = storage.put(data);
+ if ( previousData.isPresent() )
+ {
+ if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+ {
+ callListeners(l -> l.nodeChanged(previousData.get(), data));
+ }
+ }
+ else
+ {
+ callListeners(l -> l.nodeCreated(data));
+ }
+ }
+
+ private void removeStorage(String path)
+ {
+ storage.remove(path).ifPresent(previousData -> callListeners(l -> l.nodeDeleted(previousData)));
+ }
+
+ private void callListeners(Consumer<CuratorCacheListener> proc)
+ {
+ if ( state.get() == State.STARTED )
+ {
+ client.runSafe(() -> listenerManager.forEach(proc));
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..0d516f1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+/**
+ * Listener for {@link CuratorCache} events
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheListener
+{
+ /**
+ * Called when a node that was in the cache was created
+ *
+ * @param node the value of the node
+ */
+ default void nodeCreated(ChildData node)
+ {
+ // NOP
+ }
+
+ /**
+ * Called when a node that was in the cache has changed
+ *
+ * @param oldNode the previous value of the node
+ * @param node the new value of the node
+ */
+ default void nodeChanged(ChildData oldNode, ChildData node)
+ {
+ // NOP
+ }
+
+ /**
+ * Called when a node that was in the cache was deleted
+ *
+ * @param oldNode the pre-deleted value of the node
+ */
+ default void nodeDeleted(ChildData oldNode)
+ {
+ // NOP
+ }
+
+ @FunctionalInterface
+ interface ChangeListener {
+ /**
+ * Called when a node changes or is deleted
+ *
+ * @param oldNode the old node or null
+ * @param node the new node or null
+ */
+ void nodeChanged(ChildData oldNode, ChildData node);
+ }
+
+ /**
+ * Utility - returns a CuratorCacheListener that calls the given listener for when a node
+ * changes
+ *
+ * @param listener the listener
+ * @return a CuratorCacheListener
+ */
+ static CuratorCacheListener forNodeChanges(ChangeListener listener)
+ {
+ return new CuratorCacheListener()
+ {
+ @Override
+ public void nodeChanged(ChildData oldNode, ChildData node)
+ {
+ listener.nodeChanged(oldNode, node);
+ }
+ };
+ }
+
+ /**
+ * Utility - returns a CuratorCacheListener that calls the given listener when a node
+ * is created
+ *
+ * @param listener the listener
+ * @return a CuratorCacheListener
+ */
+ static CuratorCacheListener forNodeCreates(Consumer<ChildData> listener)
+ {
+ return new CuratorCacheListener()
+ {
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ listener.accept(node);
+ }
+ };
+ }
+
+ /**
+ * Utility - returns a CuratorCacheListener that calls the given listener for either
+ * new nodes or changed nodes
+ *
+ * @param listener the listener
+ * @return a CuratorCacheListener
+ */
+ static CuratorCacheListener forNodeCreatesChanges(ChangeListener listener)
+ {
+ return new CuratorCacheListener()
+ {
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ listener.nodeChanged(null, node);
+ }
+
+ @Override
+ public void nodeChanged(ChildData oldNode, ChildData node)
+ {
+ listener.nodeChanged(oldNode, node);
+ }
+ };
+ }
+
+ /**
+ * Utility - returns a CuratorCacheListener that calls the given listener when a node
+ * is deleted
+ *
+ * @param listener the listener
+ * @return a CuratorCacheListener
+ */
+ static CuratorCacheListener forNodeDeletes(Consumer<ChildData> listener)
+ {
+ return new CuratorCacheListener()
+ {
+ @Override
+ public void nodeDeleted(ChildData oldNode)
+ {
+ listener.accept(oldNode);
+ }
+ };
+ }
+
+ /**
+ * Utility - returns a CuratorCacheListener that calls the given listener for all events.
+ * Some may prefer a lambda-style interface over individual methods for each event. If
+ * both {@code oldNode} and {@code node} are non-null, it is a change. If only {@code oldNode}
+ * is null it is a creation. If only {@code node} is null it is a deletion.
+ *
+ * @param listener the listener
+ * @return a CuratorCacheListener
+ */
+ static CuratorCacheListener forAll(BiConsumer<ChildData, ChildData> listener)
+ {
+ return new CuratorCacheListener()
+ {
+ @Override
+ public void nodeDeleted(ChildData oldNode)
+ {
+ listener.accept(oldNode, null);
+ }
+
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ listener.accept(null, node);
+ }
+
+ @Override
+ public void nodeChanged(ChildData oldNode, ChildData node)
+ {
+ listener.accept(oldNode, node);
+ }
+ };
+ }
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+ * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+ * does not register the listener with the connection state listener container. Also note that CuratorCache
+ * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+ * things such as event ordering will likely be different.
+ *
+ * @param client the curator client
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ static CuratorCacheListener wrap(CuratorFramework client, PathChildrenCacheListener listener)
+ {
+ return new PathChildrenCacheListenerWrapper(listener, client);
+ }
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+ * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+ * does not register the listener with the connection state listener container. Also note that CuratorCache
+ * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+ * things such as event ordering will likely be different.
+ *
+ * @param client the curator client
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ static CuratorCacheListener wrap(CuratorFramework client, TreeCacheListener listener)
+ {
+ return new TreeCacheListenerWrapper(client, listener);
+ }
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+ * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+ * does not register the listener with the connection state listener container. Also note that CuratorCache
+ * behaves differently than {@link org.apache.curator.framework.recipes.cache.NodeCache} so
+ * things such as event ordering will likely be different.
+ *
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ static CuratorCacheListener wrap(NodeCacheListener listener)
+ {
+ return new NodeCacheListenerWrapper(listener);
+ }
+
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..88a8ebf
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public interface CuratorCacheStorage extends Closeable
+{
+ static CuratorCacheStorage standard() {
+ return new StandardCuratorCacheStorage(true);
+ }
+
+ static CuratorCacheStorage bytesNotCached() {
+ return new StandardCuratorCacheStorage(false);
+ }
+
+ CuratorCacheStorage doNotClearOnClose();
+
+ Optional<ChildData> put(ChildData data);
+
+ Optional<ChildData> remove(String path);
+
+ Optional<ChildData> get(String path);
+
+ boolean containsPath(String path);
+
+ int size();
+
+ Stream<ChildData> stream();
+
+ void clear();
+
+ @Override
+ default void close()
+ {
+ clear();
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 9687e1b..f730f78 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -53,7 +53,10 @@ import java.util.concurrent.atomic.AtomicReference;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
*/
+@Deprecated
public class NodeCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..0811f05
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,41 @@
+package org.apache.curator.framework.recipes.cache;
+
+public class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+ private final NodeCacheListener listener;
+
+ public NodeCacheListenerWrapper(NodeCacheListener listener)
+ {
+ this.listener = listener;
+ }
+
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ callListener();
+ }
+
+ @Override
+ public void nodeChanged(ChildData oldNode, ChildData node)
+ {
+ callListener();
+ }
+
+ @Override
+ public void nodeDeleted(ChildData oldNode)
+ {
+ callListener();
+ }
+
+ private void callListener()
+ {
+ try
+ {
+ listener.nodeChanged();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index bdc73cc..eb37936 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -64,8 +64,11 @@ import java.util.concurrent.atomic.AtomicReference;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
*/
@SuppressWarnings("NullableProblems")
+@Deprecated
public class PathChildrenCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..237a0e8
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+ private final PathChildrenCacheListener listener;
+ private final CuratorFramework client;
+
+ public PathChildrenCacheListenerWrapper(PathChildrenCacheListener listener, CuratorFramework client)
+ {
+ this.listener = listener;
+ this.client = client;
+ }
+
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ sendEvent(node, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ }
+
+ @Override
+ public void nodeChanged(ChildData oldData, ChildData node)
+ {
+ sendEvent(node, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ }
+
+ @Override
+ public void nodeDeleted(ChildData oldNode)
+ {
+ sendEvent(oldNode, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+
+ private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+ {
+ PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+ try
+ {
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..5049770
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+ private final Map<String, ChildData> dataMap;
+ private final boolean cacheBytes;
+
+ StandardCuratorCacheStorage(boolean cacheBytes)
+ {
+ this(new ConcurrentHashMap<>(), cacheBytes);
+ }
+
+ @Override
+ public CuratorCacheStorage doNotClearOnClose()
+ {
+ Map<String, ChildData> copyMap = new ConcurrentHashMap<>(dataMap);
+ return new StandardCuratorCacheStorage(copyMap, cacheBytes)
+ {
+ @Override
+ public void close()
+ {
+ // NOP
+ }
+ };
+ }
+
+ @Override
+ public Optional<ChildData> put(ChildData data)
+ {
+ ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+ return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+ }
+
+ @Override
+ public Optional<ChildData> remove(String path)
+ {
+ return Optional.ofNullable(dataMap.remove(path));
+ }
+
+ @Override
+ public Optional<ChildData> get(String path)
+ {
+ return Optional.ofNullable(dataMap.get(path));
+ }
+
+ @Override
+ public boolean containsPath(String path)
+ {
+ return dataMap.containsKey(path);
+ }
+
+ @Override
+ public int size()
+ {
+ return dataMap.size();
+ }
+
+ @Override
+ public Stream<ChildData> stream()
+ {
+ return dataMap.values().stream();
+ }
+
+ @Override
+ public void clear()
+ {
+ dataMap.clear();
+ }
+
+ private StandardCuratorCacheStorage(Map<String, ChildData> dataMap, boolean cacheBytes)
+ {
+ this.dataMap = dataMap;
+ this.cacheBytes = cacheBytes;
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index f42c1d5..e321eba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -71,7 +71,10 @@ import static org.apache.curator.utils.PathUtils.validatePath;
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
+ *
+ * @deprecated Use {@link CuratorCache}
*/
+@Deprecated
public class TreeCache implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..51a02cb
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+ private final CuratorFramework client;
+ private final TreeCacheListener listener;
+
+ public TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+ {
+ this.client = client;
+ this.listener = listener;
+ }
+
+ @Override
+ public void nodeCreated(ChildData node)
+ {
+ sendEvent(node, TreeCacheEvent.Type.NODE_ADDED);
+ }
+
+ @Override
+ public void nodeChanged(ChildData oldData, ChildData node)
+ {
+ sendEvent(node, TreeCacheEvent.Type.NODE_UPDATED);
+ }
+
+ @Override
+ public void nodeDeleted(ChildData oldNode)
+ {
+ sendEvent(oldNode, TreeCacheEvent.Type.NODE_REMOVED);
+ }
+
+ private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+ {
+ TreeCacheEvent event = new TreeCacheEvent(type, node);
+ try
+ {
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 2c97490..40174fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -1,4 +1,22 @@
/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -43,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference;
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+ private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
private final ConnectionStateListener connectionStateListener = (client, newState) -> {
if ( newState.isConnected() )
{
@@ -115,13 +134,29 @@ import java.util.concurrent.atomic.AtomicReference;
return listeners;
}
+ /**
+ * Listeners are called when the persistent watcher has been successfully registered
+ * or re-registered after a connection disruption
+ *
+ * @return listener container
+ */
+ public StandardListenerManager<Runnable> getResetListenable()
+ {
+ return resetListeners;
+ }
+
private void reset()
{
try
{
BackgroundCallback callback = (__, event) -> {
- if ( event.getResultCode() != KeeperException.Code.OK.intValue() ) {
- client.runSafe(this::reset);
+ if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+ {
+ reset();
+ }
+ else
+ {
+ resetListeners.forEach(Runnable::run);
}
};
client.addWatch().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..c451118
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,93 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCache extends BaseClassForTests
+{
+ private final Timing2 timing = new Timing2();
+
+ @Test
+ public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
+ {
+ try ( TestingCluster cluster = new TestingCluster(3) )
+ {
+ cluster.start();
+
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test", RECURSIVE))
+ {
+ cache.start();
+
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectLatch.countDown();
+ }
+ });
+ CountDownLatch latch = new CountDownLatch(3);
+ CuratorCacheListener listener = CuratorCacheListener.forNodeCreatesChanges((__, ___) -> latch.countDown());
+ cache.listenable().addListener(listener);
+
+ client.create().forPath("/test/one");
+ client.create().forPath("/test/two");
+ client.create().forPath("/test/three");
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ cluster.killServer(connectionInstance);
+
+ Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+ timing.sleepABit();
+
+ Assert.assertEquals(cache.storage().stream().count(), 4);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+ {
+ CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+ {
+ client.start();
+ final CountDownLatch updatedLatch = new CountDownLatch(1);
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ client.create().creatingParentsIfNeeded().forPath("/test");
+ try (CuratorCache cache = CuratorCache.build(client, storage, "/test", RECURSIVE))
+ {
+ cache.listenable().addListener(CuratorCacheListener.forNodeChanges((__, ___) -> updatedLatch.countDown()));
+ cache.listenable().addListener(CuratorCacheListener.forNodeCreates(__ -> addedLatch.countDown()));
+ cache.start();
+
+ client.create().forPath("/test/foo", "first".getBytes());
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ Assert.assertTrue(timing.awaitLatch(updatedLatch));
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..e7448d9
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,301 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheStorage;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestCuratorCacheConsistency extends BaseClassForTests
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ private static final Timing2 timing = new Timing2();
+ private static final Duration testLength = Duration.ofSeconds(30);
+ private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+ private static final Duration sleepLength = Duration.ofMillis(5);
+ private static final int nodesPerLevel = 10;
+ private static final int clusterSize = 5;
+ private static final int maxServerKills = 2;
+
+ private static final String BASE_PATH = "/test";
+
+ private static class Client implements Closeable
+ {
+ private final CuratorFramework client;
+ private final CuratorCache cache;
+ private final int index;
+
+ Client(int index, String connectionString)
+ {
+ this.index = index;
+ client = buildClient(connectionString);
+ cache = CuratorCache.build(client, CuratorCacheStorage.standard().doNotClearOnClose(), BASE_PATH, RECURSIVE);
+ }
+
+ void start()
+ {
+ client.start();
+ cache.start();
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ client.close();
+ }
+ }
+
+ @Test
+ public void testConsistencyAfterSimulation() throws Exception
+ {
+ int clientQty = random.nextInt(10, 20);
+ int maxDepth = random.nextInt(5, 10);
+
+ log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+ List<Client> clients = Collections.emptyList();
+ Map<String, String> actualTree;
+
+ try ( TestingCluster cluster = new TestingCluster(clusterSize) )
+ {
+ cluster.start();
+
+ initializeBasePath(cluster);
+ try
+ {
+ clients = buildClients(cluster, clientQty);
+ workLoop(cluster, clients, maxDepth);
+
+ log.info("Test complete - sleeping to allow events to complete");
+ timing.sleepABit();
+ }
+ finally
+ {
+ clients.forEach(Client::close);
+ }
+
+ actualTree = buildActual(cluster);
+ }
+
+ log.info("client qty: {}", clientQty);
+
+ Map<Integer, List<String>> errorsList = clients.stream()
+ .map(client -> findErrors(client.index, actualTree, client.cache.storage()))
+ .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if ( !errorsList.isEmpty() )
+ {
+ log.error("{} clients had errors", errorsList.size());
+ errorsList.forEach((index, errorList) -> {
+ log.error("Client {}", index);
+ errorList.forEach(log::error);
+ log.error("");
+ });
+
+ Assert.fail("Errors found");
+ }
+ }
+
+ private Map<String, String> buildActual(TestingCluster cluster)
+ {
+ Map<String, String> actual = new HashMap<>();
+ try ( CuratorFramework client = buildClient(cluster.getConnectString()) )
+ {
+ client.start();
+ buildActual(client, actual, BASE_PATH);
+ }
+ return actual;
+ }
+
+ private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+ {
+ try
+ {
+ byte[] bytes = client.getData().forPath(fromPath);
+ actual.put(fromPath, new String(bytes));
+ client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("", e);
+ }
+ }
+
+ private List<Client> buildClients(TestingCluster cluster, int clientQty)
+ {
+ return IntStream.range(0, clientQty)
+ .mapToObj(index -> new Client(index, cluster.getConnectString()))
+ .peek(Client::start)
+ .collect(Collectors.toList());
+ }
+
+ private void initializeBasePath(TestingCluster cluster) throws Exception
+ {
+ try ( CuratorFramework client = buildClient(cluster.getConnectString()) )
+ {
+ client.start();
+ client.create().forPath(BASE_PATH, "".getBytes());
+ }
+ }
+
+ private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth) throws Exception
+ {
+ Instant start = Instant.now();
+ Instant lastServerKill = Instant.now();
+ int serverKillIndex = 0;
+ while ( true )
+ {
+ Duration elapsed = Duration.between(start, Instant.now());
+ if ( elapsed.compareTo(testLength) >= 0 )
+ {
+ break;
+ }
+
+ Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+ if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+ {
+ lastServerKill = Instant.now();
+ if ( serverKillIndex < maxServerKills )
+ {
+ doKillServer(cluster, serverKillIndex++);
+ }
+ }
+
+ int thisDepth = random.nextInt(0, maxDepth);
+ String thisPath = buildPath(thisDepth);
+ CuratorFramework client = randomClient(clients);
+ if ( random.nextBoolean() )
+ {
+ doDelete(client, thisPath);
+ }
+ else
+ {
+ doChange(client, thisPath);
+ }
+
+ Thread.sleep(sleepLength.toMillis());
+ }
+ }
+
+ private void doChange(CuratorFramework client, String thisPath)
+ {
+ try
+ {
+ String thisData = Long.toString(random.nextLong());
+ client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("Could not create/set: " + thisPath);
+ }
+ }
+
+ private void doDelete(CuratorFramework client, String thisPath)
+ {
+ if ( thisPath.equals(BASE_PATH) )
+ {
+ return;
+ }
+ try
+ {
+ client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("Could not delete: " + thisPath);
+ }
+ }
+
+ private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+ {
+ log.info("Killing server {}", serverKillIndex);
+ InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+ cluster.killServer(killSpec);
+ }
+
+ private CuratorFramework randomClient(List<Client> clients)
+ {
+ return clients.get(random.nextInt(clients.size())).client;
+ }
+
+ private Map.Entry<Integer, List<String>> findErrors(int index, Map<String, String> tree, CuratorCacheStorage storage)
+ {
+ List<String> errors = new ArrayList<>();
+ if ( tree.size() != storage.size() )
+ {
+ errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+ }
+ tree.keySet().forEach(path -> {
+ if ( !storage.containsPath(path) )
+ {
+ errors.add(String.format("Path %s in master but not client", path));
+ }
+ });
+ storage.stream().forEach(data -> {
+ String treeValue = tree.get(data.getPath());
+ if ( treeValue != null )
+ {
+ if ( !treeValue.equals(new String(data.getData())) )
+ {
+ errors.add(String.format("Data at %s is not the same", data.getPath()));
+ }
+ }
+ else
+ {
+ errors.add(String.format("Path %s in client but not master", data.getPath()));
+ }
+ });
+ return new AbstractMap.SimpleEntry<>(index, errors);
+ }
+
+ private String buildPath(int depth)
+ {
+ StringBuilder str = new StringBuilder(BASE_PATH);
+ while ( depth-- > 0 )
+ {
+ int levelNodeName = random.nextInt(nodesPerLevel);
+ str.append("/").append(levelNodeName);
+ }
+ return str.toString();
+ }
+
+ @Override
+ protected void createServer()
+ {
+ // do nothing
+ }
+
+ private static CuratorFramework buildClient(String connectionString)
+ {
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+ return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..d94318b
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,147 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.RECURSIVE;
+
+@Test(groups = Zk35MethodInterceptor.zk35Group)
+public class TestWrappedNodeCache extends BaseClassForTests
+{
+ private final Timing2 timing = new Timing2();
+
+ @Test
+ public void testDeleteThenCreate() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+ final Semaphore semaphore = new Semaphore(0);
+ cache = CuratorCache.build(client, "/test/foo", RECURSIVE);
+ NodeCacheListener listener = semaphore::release;
+ cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+ cache.start();
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+ Assert.assertTrue(cache.getRootData().isPresent());
+ Assert.assertEquals(cache.getRootData().get().getData(), "one".getBytes());
+
+ client.delete().forPath("/test/foo");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ client.create().forPath("/test/foo", "two".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+ Assert.assertTrue(cache.getRootData().isPresent());
+ Assert.assertEquals(cache.getRootData().get().getData(), "two".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testKilledSession() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = null;
+ try
+ {
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+ CountDownLatch lostLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ });
+
+ cache = CuratorCache.build(client,"/test/node", RECURSIVE);
+
+ Semaphore latch = new Semaphore(0);
+ NodeCacheListener listener = latch::release;
+ cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+ cache.start();
+ Assert.assertTrue(timing.acquireSemaphore(latch));
+
+ Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+ Assert.assertTrue(cache.getRootData().isPresent());
+ Assert.assertEquals(cache.getRootData().get().getData(), "start".getBytes());
+
+ client.setData().forPath("/test/node", "new data".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(latch));
+ Assert.assertTrue(cache.getRootData().isPresent());
+ Assert.assertEquals(cache.getRootData().get().getData(), "new data".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testBasics() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ cache = CuratorCache.build(client, "/test/node", RECURSIVE);
+ cache.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ NodeCacheListener listener = semaphore::release;
+ cache.listenable().addListener(CuratorCacheListener.wrap(listener));
+
+ Assert.assertNull(cache.getRootData().orElse(null));
+
+ client.create().forPath("/test/node", "a".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(cache.getRootData().orElse(null).getData(), "a".getBytes());
+
+ client.setData().forPath("/test/node", "b".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(cache.getRootData().orElse(null).getData(), "b".getBytes());
+
+ client.delete().forPath("/test/node");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertNull(cache.getRootData().orElse(null));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
index df18de5..b3d5701 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.watch;
import org.apache.curator.framework.CuratorFramework;
@@ -6,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.test.compatibility.Zk35MethodInterceptor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
@@ -14,6 +33,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+@Test(groups = Zk35MethodInterceptor.zk35Group)
public class TestPersistentWatcher extends BaseClassForTests
{
private final Timing2 timing = new Timing2();
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
index 3d38fe1..58da2c0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java
@@ -225,7 +225,7 @@ public class TestingCluster implements Closeable
*/
public InstanceSpec findConnectionInstance(ZooKeeper client) throws Exception
{
- Method m = client.getClass().getDeclaredMethod("testableRemoteSocketAddress");
+ Method m = ZooKeeper.class.getDeclaredMethod("testableRemoteSocketAddress");
m.setAccessible(true);
InetSocketAddress address = (InetSocketAddress)m.invoke(client);
if ( address != null )
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
index 0f29233..143a2c8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -1,4 +1,22 @@
/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
deleted file mode 100644
index 14f3e30..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package org.apache.curator.x.async.details;
-
- import org.apache.curator.framework.api.AddPersistentWatchable;
- import org.apache.curator.framework.api.CuratorWatcher;
- import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
- import org.apache.curator.framework.imps.CuratorFrameworkImpl;
- import org.apache.curator.framework.imps.Watching;
- import org.apache.curator.x.async.AsyncStage;
- import org.apache.curator.x.async.api.AsyncPathable;
- import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
- import org.apache.zookeeper.Watcher;
-
- import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
- import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
-
- class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
- {
- private final CuratorFrameworkImpl client;
- private final Filters filters;
- private Watching watching = null;
- private boolean recursive = false;
-
- AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
- {
- this.client = client;
- this.filters = filters;
- }
-
- @Override
- public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
- {
- recursive = true;
- return this;
- }
-
- @Override
- public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
- {
- watching = new Watching(client, watcher);
- return this;
- }
-
- @Override
- public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
- {
- watching = new Watching(client, watcher);
- return this;
- }
-
- @Override
- public AsyncStage<Void> forPath(String path)
- {
- BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
- AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
- return safeCall(common.internalCallback, () -> builder.forPath(path));
- }
- }
\ No newline at end of file