You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/03 17:57:48 UTC
[curator] 01/01: Add recipes that use persistent watchers including
new cache recipes to replace all others.
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch zk36-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 7a2e00241577110fc0525bc751801a134331d843
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 12:45:21 2019 -0500
Add recipes that use persistent watchers including new cache recipes to replace all others.
---
.../framework/recipes/cache/CuratorCache.java | 114 +++++++
.../recipes/cache/CuratorCacheBuilder.java | 63 ++++
.../recipes/cache/CuratorCacheBuilderImpl.java | 74 ++++
.../framework/recipes/cache/CuratorCacheImpl.java | 303 +++++++++++++++++
.../recipes/cache/CuratorCacheListener.java | 78 +++++
.../recipes/cache/CuratorCacheListenerBuilder.java | 129 +++++++
.../cache/CuratorCacheListenerBuilderImpl.java | 161 +++++++++
.../recipes/cache/CuratorCacheStorage.java | 114 +++++++
.../recipes/cache/NodeCacheListenerWrapper.java | 46 +++
.../cache/PathChildrenCacheListenerWrapper.java | 78 +++++
.../recipes/cache/StandardCuratorCacheStorage.java | 88 +++++
.../recipes/cache/TreeCacheListenerWrapper.java | 81 +++++
.../framework/recipes/watch/PersistentWatcher.java | 169 ++++++++++
.../framework/recipes/cache/TestCuratorCache.java | 248 ++++++++++++++
.../recipes/cache/TestCuratorCacheConsistency.java | 373 +++++++++++++++++++++
.../cache/TestCuratorCacheEventOrdering.java | 52 +++
.../recipes/cache/TestCuratorCacheWrappers.java | 162 +++++++++
.../recipes/cache/TestWrappedNodeCache.java | 172 ++++++++++
.../recipes/watch/TestPersistentWatcher.java | 105 ++++++
19 files changed, 2610 insertions(+)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
new file mode 100644
index 0000000..ebe06e0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCache.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+
+/**
+ * <p>
+ * A utility that attempts to keep the data from a node locally cached. Optionally the entire
+ * tree of children below the node can also be cached. Will respond to update/create/delete events, pull
+ * down the data, etc. You can register listeners that will get notified when changes occur.
+ * </p>
+ *
+ * <p>
+ * <b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
+ * be prepared for false-positives and false-negatives. Additionally, always use the version number
+ * when updating data to avoid overwriting another process' change.
+ * </p>
+ */
+public interface CuratorCache extends Closeable
+{
+ /**
+ * cache build options
+ */
+ enum Options
+ {
+ /**
+ * Normally the entire tree of nodes starting at the given node are cached. This option
+ * causes only the given node to be cached (i.e. a single node cache)
+ */
+ SINGLE_NODE_CACHE,
+
+ /**
+ * Decompress data via {@link org.apache.curator.framework.api.GetDataBuilder#decompressed()}
+ */
+ COMPRESSED_DATA,
+
+ /**
+ * Normally, when the cache is closed via {@link CuratorCache#close()}, the storage is cleared
+ * via {@link CuratorCacheStorage#clear()}. This option prevents the storage from being cleared.
+ */
+ DO_NOT_CLEAR_ON_CLOSE
+ }
+
+ /**
+ * Return a Curator Cache for the given path with the given options using a standard storage instance
+ *
+ * @param client Curator client
+ * @param path path to cache
+ * @param options any options
+ * @return cache (note it must be started via {@link #start()}
+ */
+ static CuratorCache build(CuratorFramework client, String path, Options... options)
+ {
+ return builder(client, path).withOptions(options).build();
+ }
+
+ /**
+ * Start a Curator Cache builder
+ *
+ * @param client Curator client
+ * @param path path to cache
+ * @return builder
+ */
+ static CuratorCacheBuilder builder(CuratorFramework client, String path)
+ {
+ return new CuratorCacheBuilderImpl(client, path);
+ }
+
+ /**
+ * Start the cache. This will cause a complete refresh from the cache's root node and generate
+ * events for all nodes found, etc.
+ */
+ void start();
+
+ /**
+ * Close the cache, stop responding to events, etc.
+ */
+ @Override
+ void close();
+
+ /**
+ * Return the storage instance being used
+ *
+ * @return storage
+ */
+ CuratorCacheStorage storage();
+
+ /**
+ * Return the listener container so that listeners can be registered to be notified of changes to the cache
+ *
+ * @return listener container
+ */
+ Listenable<CuratorCacheListener> listenable();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
new file mode 100644
index 0000000..35a5f26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public interface CuratorCacheBuilder
+{
+ /**
+ * @param options any options
+ * @return this
+ */
+ CuratorCacheBuilder withOptions(CuratorCache.Options... options);
+
+ /**
+ * Alternate storage to use. If not specified, {@link StandardCuratorCacheStorage#standard()} is used
+ *
+ * @param storage storage instance to use
+ * @return this
+ */
+ CuratorCacheBuilder withStorage(CuratorCacheStorage storage);
+
+ /**
+ * By default any unexpected exception is handled by logging the exception. You can change
+ * so that a handler is called instead. Under normal circumstances, this shouldn't be necessary.
+ *
+ * @param exceptionHandler exception handler to use
+ */
+ CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler);
+
+ /**
+ * Normally, listeners are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)}. Use this
+ * method to set a different executor.
+ *
+ * @param executor to use
+ */
+ CuratorCacheBuilder withExecutor(Executor executor);
+
+ /**
+ * Return a new Curator Cache based on the builder methods that have been called
+ *
+ * @return new Curator Cache
+ */
+ CuratorCache build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
new file mode 100644
index 0000000..9f9e03d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+class CuratorCacheBuilderImpl implements CuratorCacheBuilder
+{
+ private final CuratorFramework client;
+ private final String path;
+ private CuratorCacheStorage storage;
+ private Consumer<Exception> exceptionHandler;
+ private Executor executor;
+ private CuratorCache.Options[] options;
+
+ CuratorCacheBuilderImpl(CuratorFramework client, String path)
+ {
+ this.client = client;
+ this.path = path;
+ }
+
+ @Override
+ public CuratorCacheBuilder withOptions(CuratorCache.Options... options)
+ {
+ this.options = options;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBuilder withStorage(CuratorCacheStorage storage)
+ {
+ this.storage = storage;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBuilder withExceptionHandler(Consumer<Exception> exceptionHandler)
+ {
+ this.exceptionHandler = exceptionHandler;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheBuilder withExecutor(Executor executor)
+ {
+ this.executor = executor;
+ return this;
+ }
+
+ @Override
+ public CuratorCache build()
+ {
+ return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
new file mode 100644
index 0000000..6349530
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.recipes.watch.PersistentWatcher;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.zookeeper.KeeperException.Code.NONODE;
+import static org.apache.zookeeper.KeeperException.Code.OK;
+
+class CuratorCacheImpl implements CuratorCache
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final PersistentWatcher persistentWatcher;
+ private final CuratorFramework client;
+ private final CuratorCacheStorage storage;
+ private final String path;
+ private final boolean recursive;
+ private final boolean compressedData;
+ private final boolean clearOnClose;
+ private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+ private final Consumer<Exception> exceptionHandler;
+ private final Executor executor;
+
+ private volatile AtomicLong outstandingOps;
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ CuratorCacheImpl(CuratorFramework client, CuratorCacheStorage storage, String path, Options[] optionsArg, Executor executor, Consumer<Exception> exceptionHandler)
+ {
+ Set<Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+ this.client = client;
+ this.storage = (storage != null) ? storage : CuratorCacheStorage.standard();
+ this.path = path;
+ recursive = !options.contains(Options.SINGLE_NODE_CACHE);
+ compressedData = options.contains(Options.COMPRESSED_DATA);
+ clearOnClose = !options.contains(Options.DO_NOT_CLEAR_ON_CLOSE);
+ persistentWatcher = new PersistentWatcher(client, path, recursive);
+ persistentWatcher.getListenable().addListener(this::processEvent);
+ persistentWatcher.getResetListenable().addListener(this::rebuild);
+ this.exceptionHandler = (exceptionHandler != null) ? exceptionHandler : e -> log.error("CuratorCache error", e);
+ this.executor = (executor != null) ? executor : client::runSafe;
+ }
+
+ @Override
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ outstandingOps = new AtomicLong(0);
+ persistentWatcher.start();
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ persistentWatcher.close();
+ if ( clearOnClose )
+ {
+ storage.clear();
+ }
+ }
+ }
+
+ @Override
+ public CuratorCacheStorage storage()
+ {
+ return storage;
+ }
+
+ @Override
+ public Listenable<CuratorCacheListener> listenable()
+ {
+ return listenerManager;
+ }
+
+ private void rebuild()
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ nodeChanged(path);
+ storage.stream()
+ .map(ChildData::getPath)
+ .filter(p -> !p.equals(path))
+ .forEach(this::nodeChanged);
+ }
+
+ private void processEvent(WatchedEvent event)
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ switch ( event.getType() )
+ {
+ case NodeDataChanged:
+ case NodeCreated:
+ {
+ nodeChanged(event.getPath());
+ break;
+ }
+
+ case NodeDeleted:
+ {
+ removeStorage(event.getPath());
+ break;
+ }
+
+ case NodeChildrenChanged:
+ {
+ nodeChildrenChanged(event.getPath());
+ break;
+ }
+ }
+ }
+
+ private void nodeChildrenChanged(String fromPath)
+ {
+ if ( (state.get() != State.STARTED) || !recursive )
+ {
+ return;
+ }
+
+ try
+ {
+ BackgroundCallback callback = (__, event) -> {
+ if ( event.getResultCode() == OK.intValue() )
+ {
+ event.getChildren().forEach(child -> nodeChanged(ZKPaths.makePath(fromPath, child)));
+ }
+ else if ( event.getResultCode() == NONODE.intValue() )
+ {
+ removeStorage(event.getPath());
+ }
+ else
+ {
+ handleException(event);
+ }
+ checkDecrementOutstandingOps();
+ };
+
+ checkIncrementOutstandingOps();
+ client.getChildren().inBackground(callback).forPath(fromPath);
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+
+ private void nodeChanged(String fromPath)
+ {
+ if ( state.get() != State.STARTED )
+ {
+ return;
+ }
+
+ try
+ {
+ BackgroundCallback callback = (__, event) -> {
+ if ( event.getResultCode() == OK.intValue() )
+ {
+ putStorage(new ChildData(event.getPath(), event.getStat(), event.getData()));
+ nodeChildrenChanged(event.getPath());
+ }
+ else if ( event.getResultCode() == NONODE.intValue() )
+ {
+ removeStorage(event.getPath());
+ }
+ else
+ {
+ handleException(event);
+ }
+ checkDecrementOutstandingOps();
+ };
+
+ checkIncrementOutstandingOps();
+ if ( compressedData )
+ {
+ client.getData().decompressed().inBackground(callback).forPath(fromPath);
+ }
+ else
+ {
+ client.getData().inBackground(callback).forPath(fromPath);
+ }
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+
+ public static volatile boolean JZTEST = false;
+
+ private void putStorage(ChildData data)
+ {
+ Optional<ChildData> previousData = storage.put(data);
+ if ( previousData.isPresent() )
+ {
+ if ( previousData.get().getStat().getMzxid() != data.getStat().getMzxid() )
+ {
+ callListeners(l -> l.event(NODE_CHANGED, previousData.get(), data));
+ }
+ }
+ else
+ {
+ callListeners(l -> l.event(NODE_CREATED, null, data));
+ }
+ }
+
+ private void removeStorage(String path)
+ {
+ storage.remove(path).ifPresent(previousData -> callListeners(l -> l.event(NODE_DELETED, previousData, null)));
+ }
+
+ private void callListeners(Consumer<CuratorCacheListener> proc)
+ {
+ if ( state.get() == State.STARTED )
+ {
+ executor.execute(() -> listenerManager.forEach(proc));
+ }
+ }
+
+ private void handleException(CuratorEvent event)
+ {
+ handleException(KeeperException.create(KeeperException.Code.get(event.getResultCode())));
+ }
+
+ private void handleException(Exception e)
+ {
+ ThreadUtils.checkInterrupted(e);
+ exceptionHandler.accept(e);
+ }
+
+ private void checkIncrementOutstandingOps()
+ {
+ AtomicLong localOutstandingOps = outstandingOps;
+ if ( localOutstandingOps != null )
+ {
+ localOutstandingOps.incrementAndGet();
+ }
+ }
+
+ private void checkDecrementOutstandingOps()
+ {
+ AtomicLong localOutstandingOps = outstandingOps;
+ if ( localOutstandingOps != null )
+ {
+ if ( localOutstandingOps.decrementAndGet() == 0 )
+ {
+ outstandingOps = null;
+ callListeners(CuratorCacheListener::initialized);
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
new file mode 100644
index 0000000..620e471
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListener.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+/**
+ * Listener for {@link CuratorCache} events. The main functional interface is general purpose
+ * but you can build event specific listeners, etc. using the builder. Note: all listeners
+ * are wrapped in {@link org.apache.curator.framework.CuratorFramework#runSafe(Runnable)} when called.
+ */
+@FunctionalInterface
+public interface CuratorCacheListener
+{
+ /**
+ * An enumerated type that describes a change
+ */
+ enum Type
+ {
+ /**
+ * A new node was added to the cache
+ */
+ NODE_CREATED,
+
+ /**
+ * A node already in the cache has changed
+ */
+ NODE_CHANGED,
+
+ /**
+ * A node already in the cache was deleted
+ */
+ NODE_DELETED
+ }
+
+ /**
+ * Called when a data is created, changed or deleted.
+ *
+ * @param type the type of event
+ * @param oldData the old data or null
+ * @param data the new data or null
+ */
+ void event(Type type, ChildData oldData, ChildData data);
+
+ /**
+ * When the cache is started, the initial nodes are tracked and when they are finished loading
+ * into the cache this method is called.
+ */
+ default void initialized()
+ {
+ // NOP
+ }
+
+ /**
+ * Returns a builder allowing type specific, and special purpose listeners.
+ *
+ * @return builder
+ */
+ static CuratorCacheListenerBuilder builder()
+ {
+ return new CuratorCacheListenerBuilderImpl();
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
new file mode 100644
index 0000000..c57e881
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
+import java.util.function.Consumer;
+
+public interface CuratorCacheListenerBuilder
+{
+ /**
+ * Add a standard listener
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forAll(CuratorCacheListener listener);
+
+ /**
+ * Add a listener only for {@link Type#NODE_CREATED}
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener);
+
+ @FunctionalInterface
+ interface ChangeListener
+ {
+ void event(ChildData oldNode, ChildData node);
+ }
+
+ /**
+ * Add a listener only for {@link Type#NODE_CHANGED}
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forChanges(ChangeListener listener);
+
+ /**
+ * Add a listener only both {@link Type#NODE_CREATED} and {@link Type#NODE_CHANGED}
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener);
+
+ /**
+ * Add a listener only for {@link Type#NODE_DELETED}
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener);
+
+ /**
+ * Add a listener only for {@link CuratorCacheListener#initialized()}
+ *
+ * @param listener listener to add
+ * @return this
+ */
+ CuratorCacheListenerBuilder forInitialized(Runnable listener);
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.PathChildrenCacheListener}s
+ * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+ * does not register the listener with the connection state listener container. Also note that CuratorCache
+ * behaves differently than {@link org.apache.curator.framework.recipes.cache.PathChildrenCache} so
+ * things such as event ordering will likely be different.
+ *
+ * @param client the curator client
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
+ * with CuratorCache. IMPORTANT: the connection state methods in the listener will never be called as CuratorCache
+ * does not register the listener with the connection state listener container. Also note that CuratorCache
+ * behaves differently than {@link org.apache.curator.framework.recipes.cache.TreeCache} so
+ * things such as event ordering will likely be different.
+ *
+ * @param client the curator client
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
+
+ /**
+ * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.NodeCacheListener}s
+ * with CuratorCache.
+ *
+ * @param listener the listener to wrap
+ * @return a CuratorCacheListener that forwards to the given listener
+ */
+ CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
+
+ /**
+ * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
+ * i.e. changes that occur as the cache is initializing are not sent to the listener
+ */
+ CuratorCacheListenerBuilder afterInitialized();
+
+ /**
+ * Build and return a new listener based on the methods that have been previously called
+ *
+ * @return new listener
+ */
+ CuratorCacheListener build();
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
new file mode 100644
index 0000000..4873868
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
+{
+ private final List<CuratorCacheListener> listeners = new ArrayList<>();
+ private boolean afterInitializedOnly = false;
+
+ @Override
+ public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
+ {
+ listeners.add(listener);
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forCreates(Consumer<ChildData> listener)
+ {
+ listeners.add((type, oldNode, node) -> {
+ if ( type == CuratorCacheListener.Type.NODE_CREATED )
+ {
+ listener.accept(node);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forChanges(ChangeListener listener)
+ {
+ listeners.add((type, oldNode, node) -> {
+ if ( type == CuratorCacheListener.Type.NODE_CHANGED )
+ {
+ listener.event(oldNode, node);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forCreatesAndChanges(ChangeListener listener)
+ {
+ listeners.add((type, oldNode, node) -> {
+ if ( (type == CuratorCacheListener.Type.NODE_CHANGED) || (type == CuratorCacheListener.Type.NODE_CREATED) )
+ {
+ listener.event(oldNode, node);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> listener)
+ {
+ listeners.add((type, oldNode, node) -> {
+ if ( type == CuratorCacheListener.Type.NODE_DELETED )
+ {
+ listener.accept(oldNode);
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forInitialized(Runnable listener)
+ {
+ CuratorCacheListener localListener = new CuratorCacheListener()
+ {
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ // NOP
+ }
+
+ @Override
+ public void initialized()
+ {
+ listener.run();
+ }
+ };
+ listeners.add(localListener);
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+ {
+ listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener)
+ {
+ listeners.add(new TreeCacheListenerWrapper(client, listener));
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
+ {
+ listeners.add(new NodeCacheListenerWrapper(listener));
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListenerBuilder afterInitialized()
+ {
+ afterInitializedOnly = true;
+ return this;
+ }
+
+ @Override
+ public CuratorCacheListener build()
+ {
+ List<CuratorCacheListener> copy = new ArrayList<>(listeners);
+ return new CuratorCacheListener()
+ {
+ private volatile boolean isInitialized = !afterInitializedOnly;
+
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ if ( isInitialized )
+ {
+ copy.forEach(l -> l.event(type, oldData, data));
+ }
+ }
+
+ @Override
+ public void initialized()
+ {
+ isInitialized = true;
+ copy.forEach(CuratorCacheListener::initialized);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
new file mode 100644
index 0000000..f3d870d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Interface for maintaining data in a {@link CuratorCache}
+ */
+public interface CuratorCacheStorage
+{
+ /**
+ * Return a new standard storage instance
+ *
+ * @return storage instance
+ */
+ static CuratorCacheStorage standard() {
+ return new StandardCuratorCacheStorage(true);
+ }
+
+ /**
+ * Return a new storage instance that does not retain the data bytes. i.e. ChildData objects
+ * returned by this storage will always return {@code null} for {@link ChildData#getData()}.
+ *
+ * @return storage instance that does not retain data bytes
+ */
+ static CuratorCacheStorage bytesNotCached() {
+ return new StandardCuratorCacheStorage(false);
+ }
+
+ /**
+ * Add an entry to storage and return any previous entry at that path
+ *
+ * @param data entry to add
+ * @return previous entry or {@code empty()}
+ */
+ Optional<ChildData> put(ChildData data);
+
+ /**
+ * Remove the entry from storage and return any previous entry at that path
+ *
+ * @param path path to remove
+ * @return previous entry or {@code empty()}
+ */
+ Optional<ChildData> remove(String path);
+
+ /**
+ * Return an entry from storage
+ *
+ * @param path path to get
+ * @return entry or {@code empty()}
+ */
+ Optional<ChildData> get(String path);
+
+ /**
+ * Return the current number of entries in storage
+ *
+ * @return number of entries
+ */
+ int size();
+
+ /**
+ * Return a stream over the storage entries. Note: for a standard storage instance, the stream
+ * behaves like a stream returned by {@link java.util.concurrent.ConcurrentHashMap#entrySet()}
+ *
+ * @return stream over entries
+ */
+ Stream<ChildData> stream();
+
+ /**
+ * Return a stream over the storage entries that are the immediate children of the given node.
+ *
+ * @return stream over entries
+ */
+ Stream<ChildData> streamImmediateChildren(String fromParent);
+
+ /**
+ * Utility - given a stream of child nodes, build a map. Note: it is assumed that each child
+ * data in the stream has a unique path
+ *
+ * @param stream stream of child nodes with unique paths
+ * @return map
+ */
+ static Map<String, ChildData> toMap(Stream<ChildData> stream)
+ {
+ return stream.map(data -> new AbstractMap.SimpleEntry<>(data.getPath(), data))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * Reset the storage to zero entries
+ */
+ void clear();
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
new file mode 100644
index 0000000..468049b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCacheListenerWrapper.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+
+class NodeCacheListenerWrapper implements CuratorCacheListener
+{
+ private final NodeCacheListener listener;
+
+ NodeCacheListenerWrapper(NodeCacheListener listener)
+ {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ try
+ {
+ listener.nodeChanged();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
new file mode 100644
index 0000000..a9123c1
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+
+class PathChildrenCacheListenerWrapper implements CuratorCacheListener
+{
+ private final PathChildrenCacheListener listener;
+ private final CuratorFramework client;
+
+ PathChildrenCacheListenerWrapper(CuratorFramework client, PathChildrenCacheListener listener)
+ {
+ this.listener = listener;
+ this.client = client;
+ }
+
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ switch ( type )
+ {
+ case NODE_CREATED:
+ {
+ sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ break;
+ }
+
+ case NODE_CHANGED:
+ {
+ sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ break;
+ }
+
+ case NODE_DELETED:
+ {
+ sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void initialized()
+ {
+ sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+ }
+
+ private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+ {
+ PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
+ try
+ {
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
new file mode 100644
index 0000000..bbdb21d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/StandardCuratorCacheStorage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+class StandardCuratorCacheStorage implements CuratorCacheStorage
+{
+ private final Map<String, ChildData> dataMap;
+ private final boolean cacheBytes;
+
+ StandardCuratorCacheStorage(boolean cacheBytes)
+ {
+ this.dataMap = new ConcurrentHashMap<>();
+ this.cacheBytes = cacheBytes;
+ }
+
+ @Override
+ public Optional<ChildData> put(ChildData data)
+ {
+ ChildData localData = cacheBytes ? data : new ChildData(data.getPath(), data.getStat(), null);
+ return Optional.ofNullable(dataMap.put(data.getPath(), localData));
+ }
+
+ @Override
+ public Optional<ChildData> remove(String path)
+ {
+ return Optional.ofNullable(dataMap.remove(path));
+ }
+
+ @Override
+ public Optional<ChildData> get(String path)
+ {
+ return Optional.ofNullable(dataMap.get(path));
+ }
+
+ @Override
+ public int size()
+ {
+ return dataMap.size();
+ }
+
+ @Override
+ public Stream<ChildData> stream()
+ {
+ return dataMap.values().stream();
+ }
+
+ @Override
+ public Stream<ChildData> streamImmediateChildren(String fromParent)
+ {
+ return dataMap.entrySet()
+ .stream()
+ .filter(entry -> {
+ ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(entry.getKey());
+ return pathAndNode.getPath().equals(fromParent);
+ })
+ .map(Map.Entry::getValue);
+ }
+
+ @Override
+ public void clear()
+ {
+ dataMap.clear();
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
new file mode 100644
index 0000000..570799b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheListenerWrapper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+class TreeCacheListenerWrapper implements CuratorCacheListener
+{
+ private final CuratorFramework client;
+ private final TreeCacheListener listener;
+
+ TreeCacheListenerWrapper(CuratorFramework client, TreeCacheListener listener)
+ {
+ this.client = client;
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ switch ( type )
+ {
+ case NODE_CREATED:
+ {
+ sendEvent(data, TreeCacheEvent.Type.NODE_ADDED);
+ break;
+ }
+
+ case NODE_CHANGED:
+ {
+ sendEvent(data, TreeCacheEvent.Type.NODE_UPDATED);
+ break;
+ }
+
+ case NODE_DELETED:
+ {
+ sendEvent(oldData, TreeCacheEvent.Type.NODE_REMOVED);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void initialized()
+ {
+ sendEvent(null, TreeCacheEvent.Type.INITIALIZED);
+ }
+
+ private void sendEvent(ChildData node, TreeCacheEvent.Type type)
+ {
+ TreeCacheEvent event = new TreeCacheEvent(type, node);
+ try
+ {
+ listener.childEvent(client, event);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
new file mode 100644
index 0000000..87ecb6e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A managed persistent watcher. The watch will be managed such that it stays set through
+ * connection lapses, etc.
+ */
+public class PersistentWatcher implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final StandardListenerManager<Watcher> listeners = StandardListenerManager.standard();
+ private final StandardListenerManager<Runnable> resetListeners = StandardListenerManager.standard();
+ private final ConnectionStateListener connectionStateListener = (client, newState) -> {
+ if ( newState.isConnected() )
+ {
+ reset();
+ }
+ };
+ private final Watcher watcher = event -> listeners.forEach(w -> w.process(event));
+ private final CuratorFramework client;
+ private final String basePath;
+ private final boolean recursive;
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ /**
+ * @param client client
+ * @param basePath path to set the watch on
+ * @param recursive ZooKeeper persistent watches can optionally be recursive
+ */
+ public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+ this.recursive = recursive;
+ }
+
+ /**
+ * Start watching
+ */
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ reset();
+ }
+
+ /**
+ * Remove the watcher
+ */
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ listeners.clear();
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+ try
+ {
+ client.watches().remove(watcher).guaranteed().inBackground().forPath(basePath);
+ }
+ catch ( Exception e )
+ {
+ ThreadUtils.checkInterrupted(e);
+ log.debug(String.format("Could not remove watcher for path: %s", basePath), e);
+ }
+ }
+ }
+
+ /**
+ * Container for setting listeners
+ *
+ * @return listener container
+ */
+ public Listenable<Watcher> getListenable()
+ {
+ return listeners;
+ }
+
+ /**
+ * Listeners are called when the persistent watcher has been successfully registered
+ * or re-registered after a connection disruption
+ *
+ * @return listener container
+ */
+ public StandardListenerManager<Runnable> getResetListenable()
+ {
+ return resetListeners;
+ }
+
+ private void reset()
+ {
+ try
+ {
+ BackgroundCallback callback = (__, event) -> {
+ if ( event.getResultCode() != KeeperException.Code.OK.intValue() )
+ {
+ reset();
+ }
+ else
+ {
+ resetListeners.forEach(Runnable::run);
+ }
+ };
+ client.watchers().add().withMode(recursive ? AddWatchMode.PERSISTENT_RECURSIVE : AddWatchMode.PERSISTENT).inBackground(callback).usingWatcher(watcher).forPath(basePath);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not reset persistent watch at path: " + basePath, e);
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
new file mode 100644
index 0000000..8560f87
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCache extends CuratorTestBase
+{
+ @Test
+ public void testServerLoss() throws Exception // mostly copied from TestPathChildrenCacheInCluster
+ {
+ try (TestingCluster cluster = new TestingCluster(3))
+ {
+ cluster.start();
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ cache.start();
+
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectLatch.countDown();
+ }
+ });
+ CountDownLatch latch = new CountDownLatch(3);
+ cache.listenable().addListener((__, ___, ____) -> latch.countDown());
+
+ client.create().forPath("/test/one");
+ client.create().forPath("/test/two");
+ client.create().forPath("/test/three");
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+ cluster.killServer(connectionInstance);
+
+ Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+
+ timing.sleepABit();
+
+ Assert.assertEquals(cache.storage().stream().count(), 4);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testUpdateWhenNotCachingData() throws Exception // mostly copied from TestPathChildrenCache
+ {
+ CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ final CountDownLatch updatedLatch = new CountDownLatch(1);
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ client.create().creatingParentsIfNeeded().forPath("/test");
+ try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build())
+ {
+ cache.listenable().addListener(builder().forChanges((__, ___) -> updatedLatch.countDown()).build());
+ cache.listenable().addListener(builder().forCreates(__ -> addedLatch.countDown()).build());
+ cache.start();
+
+ client.create().forPath("/test/foo", "first".getBytes());
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ Assert.assertTrue(timing.awaitLatch(updatedLatch));
+ }
+ }
+ }
+
+ @Test
+ public void testAfterInitialized() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test");
+ client.create().creatingParentsIfNeeded().forPath("/test/one");
+ client.create().creatingParentsIfNeeded().forPath("/test/one/two");
+ client.create().creatingParentsIfNeeded().forPath("/test/one/two/three");
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ CountDownLatch initializedLatch = new CountDownLatch(1);
+ AtomicInteger eventCount = new AtomicInteger(0);
+ CuratorCacheListener listener = new CuratorCacheListener()
+ {
+ @Override
+ public void event(Type type, ChildData oldData, ChildData data)
+ {
+ eventCount.incrementAndGet();
+ }
+
+ @Override
+ public void initialized()
+ {
+ initializedLatch.countDown();
+ }
+ };
+ cache.listenable().addListener(builder().forAll(listener).afterInitialized().build());
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+ Assert.assertEquals(initializedLatch.getCount(), 0);
+ Assert.assertEquals(cache.storage().size(), 4);
+ Assert.assertTrue(cache.storage().get("/test").isPresent());
+ Assert.assertTrue(cache.storage().get("/test/one").isPresent());
+ Assert.assertTrue(cache.storage().get("/test/one/two").isPresent());
+ Assert.assertTrue(cache.storage().get("/test/one/two/three").isPresent());
+ }
+ }
+ }
+
+ @Test
+ public void testListenerBuilder() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ Semaphore all = new Semaphore(0);
+ Semaphore deletes = new Semaphore(0);
+ Semaphore changes = new Semaphore(0);
+ Semaphore creates = new Semaphore(0);
+ Semaphore createsAndChanges = new Semaphore(0);
+
+ CuratorCacheListener listener = builder().forAll((__, ___, ____) -> all.release()).forDeletes(__ -> deletes.release()).forChanges((__, ___) -> changes.release()).forCreates(__ -> creates.release()).forCreatesAndChanges((__, ___) -> createsAndChanges.release()).build();
+ cache.listenable().addListener(listener);
+ cache.start();
+
+ client.create().forPath("/test");
+ Assert.assertTrue(timing.acquireSemaphore(all, 1));
+ Assert.assertTrue(timing.acquireSemaphore(creates, 1));
+ Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+ Assert.assertEquals(changes.availablePermits(), 0);
+ Assert.assertEquals(deletes.availablePermits(), 0);
+
+ client.setData().forPath("/test", "new".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(all, 1));
+ Assert.assertTrue(timing.acquireSemaphore(changes, 1));
+ Assert.assertTrue(timing.acquireSemaphore(createsAndChanges, 1));
+ Assert.assertEquals(creates.availablePermits(), 0);
+ Assert.assertEquals(deletes.availablePermits(), 0);
+
+ client.delete().forPath("/test");
+ Assert.assertTrue(timing.acquireSemaphore(all, 1));
+ Assert.assertTrue(timing.acquireSemaphore(deletes, 1));
+ Assert.assertEquals(creates.availablePermits(), 0);
+ Assert.assertEquals(changes.availablePermits(), 0);
+ Assert.assertEquals(createsAndChanges.availablePermits(), 0);
+ }
+ }
+ }
+
+ @Test
+ public void testOverrideExecutor() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ CountDownLatch latch = new CountDownLatch(2);
+ Executor executor = proc -> {
+ latch.countDown();
+ proc.run();
+ };
+ try ( CuratorCache cache = CuratorCache.builder(client, "/test").withExecutor(executor).build() )
+ {
+ cache.listenable().addListener((type, oldData, data) -> latch.countDown());
+ cache.start();
+
+ client.create().forPath("/test");
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ }
+ }
+
+ @Test
+ public void testClearOnClose() throws Exception
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ CuratorCacheStorage storage;
+ client.start();
+
+ try ( CuratorCache cache = CuratorCache.builder(client, "/test").withOptions(DO_NOT_CLEAR_ON_CLOSE).build() )
+ {
+ cache.start();
+ storage = cache.storage();
+
+ client.create().forPath("/test", "foo".getBytes());
+ client.create().forPath("/test/bar", "bar".getBytes());
+ timing.sleepABit();
+ }
+ Assert.assertEquals(storage.size(), 2);
+
+ try ( CuratorCache cache = CuratorCache.build(client, "/test") )
+ {
+ cache.start();
+ storage = cache.storage();
+
+ timing.sleepABit();
+ }
+ Assert.assertEquals(storage.size(), 0);
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
new file mode 100644
index 0000000..ab9dbe4
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheConsistency.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.ZKPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+/**
+ * Randomly create nodes in a tree while a set of CuratorCaches listens. Afterwards, validate
+ * that the caches contain the same values as ZK itself
+ */
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheConsistency extends CuratorTestBase
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ private static final Duration testLength = Duration.ofSeconds(30);
+ private static final Duration thirdOfTestLength = Duration.ofMillis(testLength.toMillis() / 3);
+ private static final Duration sleepLength = Duration.ofMillis(5);
+ private static final int nodesPerLevel = 10;
+ private static final int clusterSize = 5;
+ private static final int maxServerKills = 2;
+
+ private static final String BASE_PATH = "/test";
+
+ private class Client implements Closeable
+ {
+ private final CuratorFramework client;
+ private final CuratorCache cache;
+ private final int index;
+ private final Map<String, ChildData> listenerDataMap = new HashMap<>();
+
+ Client(int index, String connectionString, AtomicReference<Exception> errorSignal)
+ {
+ this.index = index;
+ client = buildClient(connectionString);
+ cache = CuratorCache.builder(client, BASE_PATH).withOptions(DO_NOT_CLEAR_ON_CLOSE).withExceptionHandler(errorSignal::set).build();
+
+ // listenerDataMap is a local data map that will hold values sent by listeners
+ // this way, the listener code can be tested for validity and consistency
+ CuratorCacheListener listener = builder().forCreates(node -> {
+ ChildData previous = listenerDataMap.put(node.getPath(), node);
+ if ( previous != null )
+ {
+ errorSignal.set(new Exception(String.format("Client: %d - Create for existing node: %s", index, node.getPath())));
+ }
+ }).forChanges((oldNode, node) -> {
+ ChildData previous = listenerDataMap.put(node.getPath(), node);
+ if ( (previous == null) || !Arrays.equals(previous.getData(), oldNode.getData()) )
+ {
+ errorSignal.set(new Exception(String.format("Client: %d - Bad old value for change node: %s", index, node.getPath())));
+ }
+ }).forDeletes(node -> {
+ ChildData previous = listenerDataMap.remove(node.getPath());
+ if ( previous == null )
+ {
+ errorSignal.set(new Exception(String.format("Client: %d - Delete for non-existent node: %s", index, node.getPath())));
+ }
+ }).build();
+ cache.listenable().addListener(listener);
+ }
+
+ void start()
+ {
+ client.start();
+ cache.start();
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ client.close();
+ }
+ }
+
+ @Test
+ public void testConsistencyAfterSimulation() throws Exception
+ {
+ int clientQty = random.nextInt(10, 20);
+ int maxDepth = random.nextInt(5, 10);
+
+ log.info("clientQty: {}, maxDepth: {}", clientQty, maxDepth);
+
+ List<Client> clients = Collections.emptyList();
+ Map<String, String> actualTree;
+
+ AtomicReference<Exception> errorSignal = new AtomicReference<>();
+ try (TestingCluster cluster = new TestingCluster(clusterSize))
+ {
+ cluster.start();
+
+ initializeBasePath(cluster);
+ try
+ {
+ clients = buildClients(cluster, clientQty, errorSignal);
+ workLoop(cluster, clients, maxDepth, errorSignal);
+
+ log.info("Test complete - sleeping to allow events to complete");
+ timing.sleepABit();
+ }
+ finally
+ {
+ clients.forEach(Client::close);
+ }
+
+ actualTree = buildActual(cluster);
+ }
+
+ log.info("client qty: {}", clientQty);
+
+ Map<Integer, List<String>> errorsList = clients.stream()
+ .map(client -> findErrors(client, actualTree))
+ .filter(errorsEntry -> !errorsEntry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ if ( !errorsList.isEmpty() )
+ {
+ log.error("{} clients had errors", errorsList.size());
+ errorsList.forEach((index, errorList) -> {
+ log.error("Client {}", index);
+ errorList.forEach(log::error);
+ log.error("");
+ });
+
+ Assert.fail("Errors found");
+ }
+ }
+
+ // build a data map recursively from the actual values in ZK
+ private Map<String, String> buildActual(TestingCluster cluster)
+ {
+ Map<String, String> actual = new HashMap<>();
+ try (CuratorFramework client = buildClient(cluster.getConnectString()))
+ {
+ client.start();
+ buildActual(client, actual, BASE_PATH);
+ }
+ return actual;
+ }
+
+ private void buildActual(CuratorFramework client, Map<String, String> actual, String fromPath)
+ {
+ try
+ {
+ byte[] bytes = client.getData().forPath(fromPath);
+ actual.put(fromPath, new String(bytes));
+ client.getChildren().forPath(fromPath).forEach(child -> buildActual(client, actual, ZKPaths.makePath(fromPath, child)));
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("", e);
+ }
+ }
+
+ private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
+ {
+ return IntStream.range(0, clientQty)
+ .mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
+ .peek(Client::start)
+ .collect(Collectors.toList());
+ }
+
+ private void initializeBasePath(TestingCluster cluster) throws Exception
+ {
+ try (CuratorFramework client = buildClient(cluster.getConnectString()))
+ {
+ client.start();
+ client.create().forPath(BASE_PATH, "".getBytes());
+ }
+ }
+
+ private void workLoop(TestingCluster cluster, List<Client> clients, int maxDepth, AtomicReference<Exception> errorSignal) throws Exception
+ {
+ Instant start = Instant.now();
+ Instant lastServerKill = Instant.now();
+ int serverKillIndex = 0;
+ while ( true )
+ {
+ Duration elapsed = Duration.between(start, Instant.now());
+ if ( elapsed.compareTo(testLength) >= 0 )
+ {
+ break;
+ }
+
+ Exception errorSignalException = errorSignal.get();
+ if ( errorSignalException != null )
+ {
+ Assert.fail("A client's error handler was called", errorSignalException);
+ }
+
+ Duration elapsedFromLastServerKill = Duration.between(lastServerKill, Instant.now());
+ if ( elapsedFromLastServerKill.compareTo(thirdOfTestLength) >= 0 )
+ {
+ lastServerKill = Instant.now();
+ if ( serverKillIndex < maxServerKills )
+ {
+ doKillServer(cluster, serverKillIndex++);
+ }
+ }
+
+ int thisDepth = random.nextInt(0, maxDepth);
+ String thisPath = randomPath(thisDepth);
+ CuratorFramework client = randomClient(clients);
+ if ( random.nextBoolean() )
+ {
+ doDelete(client, thisPath);
+ }
+ else
+ {
+ doChange(client, thisPath);
+ }
+
+ Thread.sleep(sleepLength.toMillis());
+ }
+ }
+
+ private void doChange(CuratorFramework client, String thisPath)
+ {
+ try
+ {
+ String thisData = Long.toString(random.nextLong());
+ client.create().orSetData().creatingParentsIfNeeded().forPath(thisPath, thisData.getBytes());
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("Could not create/set: " + thisPath);
+ }
+ }
+
+ private void doDelete(CuratorFramework client, String thisPath)
+ {
+ if ( thisPath.equals(BASE_PATH) )
+ {
+ return;
+ }
+ try
+ {
+ client.delete().quietly().deletingChildrenIfNeeded().forPath(thisPath);
+ }
+ catch ( Exception e )
+ {
+ Assert.fail("Could not delete: " + thisPath);
+ }
+ }
+
+ private void doKillServer(TestingCluster cluster, int serverKillIndex) throws Exception
+ {
+ log.info("Killing server {}", serverKillIndex);
+ InstanceSpec killSpec = new ArrayList<>(cluster.getInstances()).get(serverKillIndex);
+ cluster.killServer(killSpec);
+ }
+
+ private CuratorFramework randomClient(List<Client> clients)
+ {
+ return clients.get(random.nextInt(clients.size())).client;
+ }
+
+ private Map.Entry<Integer, List<String>> findErrors(Client client, Map<String, String> tree)
+ {
+ CuratorCacheStorage storage = client.cache.storage();
+ List<String> errors = new ArrayList<>();
+ if ( tree.size() != storage.size() )
+ {
+ errors.add(String.format("Size mismatch. Expected: %d - Actual: %d", tree.size(), storage.size()));
+ }
+ tree.keySet().forEach(path -> {
+ if ( !storage.get(path).isPresent() )
+ {
+ errors.add(String.format("Path %s in master but not client", path));
+ }
+ });
+ storage.stream().forEach(data -> {
+ String treeValue = tree.get(data.getPath());
+ if ( treeValue != null )
+ {
+ if ( !treeValue.equals(new String(data.getData())) )
+ {
+ errors.add(String.format("Data at %s is not the same", data.getPath()));
+ }
+
+ ChildData listenersMapData = client.listenerDataMap.get(data.getPath());
+ if ( listenersMapData == null )
+ {
+ errors.add(String.format("listenersMap missing data at: %s", data.getPath()));
+ }
+ else if ( !treeValue.equals(new String(listenersMapData.getData())) )
+ {
+ errors.add(String.format("Data at %s in listenersMap is not the same", data.getPath()));
+ }
+ }
+ else
+ {
+ errors.add(String.format("Path %s in client but not master", data.getPath()));
+ }
+ });
+
+ client.listenerDataMap.keySet().forEach(path -> {
+ if ( !storage.get(path).isPresent() )
+ {
+ errors.add(String.format("Path %s in listenersMap but not storage", path));
+ }
+ });
+
+ return new AbstractMap.SimpleEntry<>(client.index, errors);
+ }
+
+ private String randomPath(int depth)
+ {
+ StringBuilder str = new StringBuilder(BASE_PATH);
+ while ( depth-- > 0 )
+ {
+ int levelNodeName = random.nextInt(nodesPerLevel);
+ str.append("/").append(levelNodeName);
+ }
+ return str.toString();
+ }
+
+ @Override
+ protected void createServer()
+ {
+ // do nothing - we'll be using TestingCluster instead
+ }
+
+ private CuratorFramework buildClient(String connectionString)
+ {
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(100, 100);
+ return CuratorFrameworkFactory.newClient(connectionString, timing.session(), timing.connection(), retryPolicy);
+ }
+}
\ No newline at end of file
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
new file mode 100644
index 0000000..8baf2e2
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheEventOrdering.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheEventOrdering extends TestEventOrdering<CuratorCache>
+{
+ @Override
+ protected int getActualQty(CuratorCache cache)
+ {
+ return cache.storage().size();
+ }
+
+ @Override
+ protected CuratorCache newCache(CuratorFramework client, String path, BlockingQueue<Event> events)
+ {
+ CuratorCache cache = CuratorCache.build(client, path);
+ cache.listenable().addListener((type, oldNode, node) -> {
+ if ( type == CuratorCacheListener.Type.NODE_CREATED )
+ {
+ events.add(new Event(EventType.ADDED, node.getPath()));
+ }
+ else if ( type == CuratorCacheListener.Type.NODE_DELETED )
+ {
+ events.add(new Event(EventType.DELETED, oldNode.getPath()));
+ }
+ });
+ cache.start();
+ return cache;
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
new file mode 100644
index 0000000..4a75acf
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.SINGLE_NODE_CACHE;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheStorage.toMap;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestCuratorCacheWrappers extends CuratorTestBase
+{
+ @Test
+ public void testPathChildrenCache() throws Exception // copied from TestPathChildrenCache#testBasics()
+ {
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<>();
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ PathChildrenCacheListener listener = (__, event) -> {
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events.offer(event.getType());
+ }
+ };
+ cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+ cache.start();
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+ }
+ }
+
+ @Test
+ public void testTreeCache() throws Exception // copied from TestTreeCache#testBasics()
+ {
+ BaseTestTreeCache treeCacheBase = new BaseTestTreeCache();
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test"))
+ {
+ cache.listenable().addListener(builder().forTreeCache(client, treeCacheBase.eventListener).build());
+ cache.start();
+
+ treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
+ treeCacheBase.assertEvent(TreeCacheEvent.Type.INITIALIZED);
+ Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.storage().streamImmediateChildren("/t").count(), 0);
+ Assert.assertEquals(cache.storage().streamImmediateChildren("/testing").count(), 0);
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+ Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "hey there");
+ Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test/one")).keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.storage().streamImmediateChildren("/test/o").count(), 0);
+ Assert.assertEquals(cache.storage().streamImmediateChildren("/test/onely").count(), 0);
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
+ Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of("/test/one"));
+ Assert.assertEquals(new String(cache.storage().get("/test/one").orElseThrow(AssertionError::new).getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ treeCacheBase.assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+ Assert.assertEquals(toMap(cache.storage().streamImmediateChildren("/test")).keySet(), ImmutableSet.of());
+ }
+ }
+ }
+
+ @Test
+ public void testNodeCache() throws Exception // copied from TestNodeCache#testBasics()
+ {
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ try (CuratorCache cache = CuratorCache.build(client, "/test/node", SINGLE_NODE_CACHE))
+ {
+ Supplier<ChildData> getRootData = () -> cache.storage().get("/test/node").orElseThrow(() -> new AssertionError("is not present"));
+ cache.start();
+
+ final Semaphore semaphore = new Semaphore(0);
+ cache.listenable().addListener(builder().forNodeCache(semaphore::release).build());
+ try
+ {
+ getRootData.get();
+ Assert.fail("Should have thrown");
+ }
+ catch ( AssertionError expected )
+ {
+ // expected
+ }
+
+ client.create().forPath("/test/node", "a".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(getRootData.get().getData(), "a".getBytes());
+
+ client.setData().forPath("/test/node", "b".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(getRootData.get().getData(), "b".getBytes());
+
+ client.delete().forPath("/test/node");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ try
+ {
+ getRootData.get();
+ Assert.fail("Should have thrown");
+ }
+ catch ( AssertionError expected )
+ {
+ // expected
+ }
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
new file mode 100644
index 0000000..3e81b63
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestWrappedNodeCache.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
+
+@Test(groups = CuratorTestBase.zk36Group)
+public class TestWrappedNodeCache extends CuratorTestBase
+{
+ @Test
+ public void testDeleteThenCreate() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test/foo", "one".getBytes());
+
+ final Semaphore semaphore = new Semaphore(0);
+ cache = CuratorCache.build(client, "/test/foo");
+ NodeCacheListener listener = semaphore::release;
+ cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+ Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/foo");
+
+ cache.start();
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+ Assert.assertTrue(rootData.get().isPresent());
+ Assert.assertEquals(rootData.get().get().getData(), "one".getBytes());
+
+ client.delete().forPath("/test/foo");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ client.create().forPath("/test/foo", "two".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+
+ Assert.assertTrue(rootData.get().isPresent());
+ Assert.assertEquals(rootData.get().get().getData(), "two".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testKilledSession() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = null;
+ try
+ {
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test/node", "start".getBytes());
+
+ CountDownLatch lostLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ });
+
+ cache = CuratorCache.build(client,"/test/node");
+
+ Semaphore latch = new Semaphore(0);
+ NodeCacheListener listener = latch::release;
+ cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+ Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+ cache.start();
+ Assert.assertTrue(timing.acquireSemaphore(latch));
+
+ Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+ Assert.assertTrue(rootData.get().isPresent());
+ Assert.assertEquals(rootData.get().get().getData(), "start".getBytes());
+
+ client.setData().forPath("/test/node", "new data".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(latch));
+ Assert.assertTrue(rootData.get().isPresent());
+ Assert.assertEquals(rootData.get().get().getData(), "new data".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ @Test
+ public void testBasics() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ cache = CuratorCache.build(client, "/test/node");
+ cache.start();
+
+ Supplier<Optional<ChildData>> rootData = getRootDataProc(cache, "/test/node");
+
+ final Semaphore semaphore = new Semaphore(0);
+ NodeCacheListener listener = semaphore::release;
+ cache.listenable().addListener(builder().forNodeCache(listener).build());
+
+ Assert.assertNull(rootData.get().orElse(null));
+
+ client.create().forPath("/test/node", "a".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(rootData.get().orElse(null).getData(), "a".getBytes());
+
+ client.setData().forPath("/test/node", "b".getBytes());
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertEquals(rootData.get().orElse(null).getData(), "b".getBytes());
+
+ client.delete().forPath("/test/node");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
+ Assert.assertNull(rootData.get().orElse(null));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ private Supplier<Optional<ChildData>> getRootDataProc(CuratorCache cache, String rootPath)
+ {
+ return () -> cache.storage().get(rootPath);
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
new file mode 100644
index 0000000..534c365
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.CuratorTestBase;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestPersistentWatcher extends CuratorTestBase
+{
+ @Test
+ public void testConnectionLostRecursive() throws Exception
+ {
+ internalTest(true);
+ }
+
+ @Test
+ public void testConnectionLost() throws Exception
+ {
+ internalTest(false);
+ }
+
+ private void internalTest(boolean recursive) throws Exception
+ {
+ try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) )
+ {
+ CountDownLatch lostLatch = new CountDownLatch(1);
+ CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ client.start();
+ client.getConnectionStateListenable().addListener((__, newState) -> {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ else if ( newState == ConnectionState.RECONNECTED )
+ {
+ reconnectedLatch.countDown();
+ }
+ });
+
+ try ( PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", recursive) )
+ {
+ persistentWatcher.start();
+
+ BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
+ persistentWatcher.getListenable().addListener(events::add);
+
+ client.create().creatingParentsIfNeeded().forPath("/top/main/a");
+ Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+ if ( recursive )
+ {
+ Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main/a");
+ }
+ else
+ {
+ Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main"); // child added
+ }
+
+ server.stop();
+ Assert.assertEquals(timing.takeFromQueue(events).getState(), Watcher.Event.KeeperState.Disconnected);
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+
+ server.restart();
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+
+ timing.sleepABit(); // time to allow watcher to get reset
+ events.clear();
+
+ if ( recursive )
+ {
+ client.setData().forPath("/top/main/a", "foo".getBytes());
+ Assert.assertEquals(timing.takeFromQueue(events).getType(), Watcher.Event.EventType.NodeDataChanged);
+ }
+ client.setData().forPath("/top/main", "bar".getBytes());
+ Assert.assertEquals(timing.takeFromQueue(events).getPath(), "/top/main");
+ }
+ }
+ }
+}
\ No newline at end of file