You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/10/04 14:02:01 UTC
[1/8] curator git commit: 1. Persistent watches are now optionally
recursive - support this. 2. Added bridge classes to help TreeCache users
switch to CuratorCache
Repository: curator
Updated Branches:
refs/heads/persistent-watch 9a05598c4 -> d7bf1a246
1. Persistent watches are now optionally recursive - support this. 2. Added bridge classes to help TreeCache users switch to CuratorCache
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a83e3e0b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a83e3e0b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a83e3e0b
Branch: refs/heads/persistent-watch
Commit: a83e3e0b5f1a8ea031fcf2cb32f745880ecaa8b3
Parents: 9a05598
Author: randgalt <ra...@apache.org>
Authored: Wed Aug 23 08:04:01 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Aug 23 08:04:01 2017 +0200
----------------------------------------------------------------------
.../api/AddPersistentWatchBuilder.java | 11 +-
.../api/AddPersistentWatchBuilder2.java | 25 +
.../imps/AddPersistentWatchBuilderImpl.java | 12 +-
.../framework/recipes/cache/ListenerBridge.java | 197 ++++++++
.../framework/recipes/cache/SelectorBridge.java | 55 ++
.../recipes/watch/InternalCuratorCache.java | 2 +-
.../recipes/watch/PersistentWatcher.java | 9 +-
.../recipes/cache/BaseTestTreeCache.java | 28 +-
.../framework/recipes/cache/TestTreeCache.java | 2 +-
.../recipes/cache/TestTreeCacheBridge.java | 500 +++++++++++++++++++
.../cache/TestTreeCacheBridgeRandomTree.java | 224 +++++++++
.../recipes/cache/TestTreeCacheRandomTree.java | 2 +-
.../org/apache/curator/test/WatchersDebug.java | 9 +
13 files changed, 1064 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
index 4927afc..057919e 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder.java
@@ -18,8 +18,13 @@
*/
package org.apache.curator.framework.api;
-public interface AddPersistentWatchBuilder extends
- Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
- AddPersistentWatchable<Pathable<Void>>
+public interface AddPersistentWatchBuilder extends AddPersistentWatchBuilder2
{
+ /**
+ * ZooKeeper persistent watches can optionally be recursive. See
+ * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+ *
+ * @return this
+ */
+ AddPersistentWatchBuilder2 recursive();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
new file mode 100644
index 0000000..ce1ffed
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AddPersistentWatchBuilder2.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface AddPersistentWatchBuilder2 extends
+ Backgroundable<AddPersistentWatchable<Pathable<Void>>>,
+ AddPersistentWatchable<Pathable<Void>>
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
index bf4dfb6..56f8f79 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
import org.apache.curator.RetryLoop;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.framework.api.AddPersistentWatchBuilder;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
import org.apache.curator.framework.api.AddPersistentWatchable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -37,6 +38,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
private final CuratorFrameworkImpl client;
private Watching watching = null;
private Backgrounding backgrounding = new Backgrounding();
+ private boolean recursive = false;
AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client)
{
@@ -51,6 +53,13 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
}
@Override
+ public AddPersistentWatchBuilder2 recursive()
+ {
+ recursive = true;
+ return this;
+ }
+
+ @Override
public Pathable<Void> usingWatcher(Watcher watcher)
{
watching = new Watching(client, watcher);
@@ -125,6 +134,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
(
fixedPath,
watching.getWatcher(path),
+ recursive,
new AsyncCallback.VoidCallback()
{
@Override
@@ -156,7 +166,7 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
@Override
public Void call() throws Exception
{
- client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path));
+ client.getZooKeeper().addPersistentWatch(fixedPath, watching.getWatcher(path), recursive);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
new file mode 100644
index 0000000..8a2d665
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -0,0 +1,197 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.watch.CacheEvent;
+import org.apache.curator.framework.recipes.watch.CacheListener;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * Utility to bridge old TreeCache {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}
+ * instances with new {@link org.apache.curator.framework.recipes.watch.CacheListener} so that you can
+ * use existing listeners without rewriting them.
+ * </p>
+ *
+ * <p>
+ * Create a ListenerBridge from your existing TreeCacheListener. You can then call {@link #add()}
+ * to add the bridge listener to a CuratorCache.
+ * </p>
+ */
+public class ListenerBridge implements CacheListener, ConnectionStateListener
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final Listenable<CacheListener> listenable;
+ private final TreeCacheListener listener;
+ private final AtomicBoolean added = new AtomicBoolean(false);
+
+ /**
+ * Builder style constructor
+ *
+ * @param client the client
+ * @param listenable CuratorCache listener container
+ * @param listener the old TreeCacheListener
+ * @return listener bridge
+ */
+ public static ListenerBridge wrap(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener)
+ {
+ return new ListenerBridge(client, listenable, listener);
+ }
+
+ /**
+ * @param client the client
+ * @param listenable CuratorCache listener container
+ * @param listener the old TreeCacheListener
+ */
+ public ListenerBridge(CuratorFramework client, Listenable<CacheListener> listenable, TreeCacheListener listener)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.listenable = Objects.requireNonNull(listenable, "listenable cannot be null");
+ this.listener = Objects.requireNonNull(listener, "listener cannot be null");
+ }
+
+ @Override
+ public void process(CacheEvent event, String path, CachedNode affectedNode)
+ {
+ try
+ {
+ listener.childEvent(client, toEvent(event, path, affectedNode));
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ TreeCacheEvent.Type type = toType(newState);
+ if ( type != null )
+ {
+ try
+ {
+ listener.childEvent(client, new TreeCacheEvent(type, null));
+ }
+ catch ( Exception e )
+ {
+ handleException(e);
+ }
+ }
+ }
+
+ /**
+ * Add this listener to the listener container. Note: this method is not idempotent
+ */
+ public void add()
+ {
+ Preconditions.checkState(added.compareAndSet(false, true), "Already added");
+ client.getConnectionStateListenable().addListener(this);
+ listenable.addListener(this);
+ }
+
+ /**
+ * Remove this listener from the listener container
+ */
+ public void remove()
+ {
+ if ( added.compareAndSet(true, false) )
+ {
+ client.getConnectionStateListenable().removeListener(this);
+ listenable.removeListener(this);
+ }
+ }
+
+ /**
+ * Utility - convert a new CuratorCache event to an old TreeCache event
+ *
+ * @param event event to convert
+ * @return new value
+ */
+ public static TreeCacheEvent.Type toType(CacheEvent event)
+ {
+ switch ( event )
+ {
+ case NODE_CREATED:
+ return TreeCacheEvent.Type.NODE_ADDED;
+
+ case NODE_DELETED:
+ return TreeCacheEvent.Type.NODE_REMOVED;
+
+ case NODE_CHANGED:
+ return TreeCacheEvent.Type.NODE_UPDATED;
+
+ case CACHE_REFRESHED:
+ return TreeCacheEvent.Type.INITIALIZED;
+ }
+
+ throw new IllegalStateException("Unknown event: " + event);
+ }
+
+ /**
+ * Utility - convert a connection state event to an old TreeCache event
+ *
+ * @param state event to convert
+ * @return new value or null if there is no corresponding TreeCache value
+ */
+ public static TreeCacheEvent.Type toType(ConnectionState state)
+ {
+ switch ( state )
+ {
+ case RECONNECTED:
+ return TreeCacheEvent.Type.CONNECTION_RECONNECTED;
+
+ case SUSPENDED:
+ return TreeCacheEvent.Type.CONNECTION_SUSPENDED;
+
+ case LOST:
+ return TreeCacheEvent.Type.CONNECTION_LOST;
+ }
+
+ return null;
+ }
+
+ /**
+ * Convert Curator Cache listener values to TreeCache data
+ *
+ * @param path the affected path (can be null)
+ * @param affectedNode the node (can be null)
+ * @return TreeCache data or null
+ */
+ public static ChildData toData(String path, CachedNode affectedNode)
+ {
+ if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) )
+ {
+ return new ChildData(path, affectedNode.getStat(), affectedNode.getData());
+ }
+ return null;
+ }
+
+ /**
+ * Generate a TreeCacheEvent from Curator cache event data
+ *
+ * @param event event type
+ * @param path affected path (can be null)
+ * @param affectedNode affected data (can be null)
+ * @return event
+ */
+ public static TreeCacheEvent toEvent(CacheEvent event, String path, CachedNode affectedNode)
+ {
+ TreeCacheEvent.Type type = toType(event);
+ ChildData data = (event == CacheEvent.CACHE_REFRESHED) ? null : toData(path, affectedNode);
+ return new TreeCacheEvent(type, data);
+ }
+
+ protected void handleException(Exception e)
+ {
+ log.error("Unhandled exception in listener", e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
new file mode 100644
index 0000000..0c6af08
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
@@ -0,0 +1,55 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.recipes.watch.CacheAction;
+import org.apache.curator.framework.recipes.watch.CacheSelector;
+import java.util.Objects;
+
+/**
+ * Utility to bridge an old TreeCacheSelector to a new CuratorCache selector
+ */
+public class SelectorBridge implements CacheSelector
+{
+ private final TreeCacheSelector selector;
+ private final CacheAction action;
+
+ /**
+ * Builder style constructor
+ *
+ * @param selector the old TreeCacheSelector to bridge
+ * @return bridged selector
+ */
+ public static SelectorBridge wrap(TreeCacheSelector selector)
+ {
+ return new SelectorBridge(selector);
+ }
+
+ /**
+ * @param selector the old TreeCacheSelector to bridge
+ */
+ public SelectorBridge(TreeCacheSelector selector)
+ {
+ this(selector, CacheAction.STAT_AND_DATA);
+ }
+
+ /**
+ * @param selector the old TreeCacheSelector to bridge
+ * @param action value to return for active paths
+ */
+ public SelectorBridge(TreeCacheSelector selector, CacheAction action)
+ {
+ this.selector = Objects.requireNonNull(selector, "selector cannot be null");
+ this.action = Objects.requireNonNull(action, "action cannot be null");
+ }
+
+ @Override
+ public boolean traverseChildren(String basePath, String fullPath)
+ {
+ return selector.traverseChildren(fullPath);
+ }
+
+ @Override
+ public CacheAction actionForPath(String basePath, String fullPath)
+ {
+ return selector.acceptChild(fullPath) ? action : CacheAction.NOT_STORED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index e2b1bf3..edd08b5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -71,7 +71,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
this.basePath = Objects.requireNonNull(path, "path cannot be null");
this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null");
this.sortChildren = sortChildren;
- watcher = new PersistentWatcher(client, path)
+ watcher = new PersistentWatcher(client, path, true)
{
@Override
protected void noteWatcherReset()
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 310478a..3884a69 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.watch;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.AddPersistentWatchBuilder2;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
@@ -83,6 +84,7 @@ public class PersistentWatcher implements Closeable
};
private final CuratorFramework client;
private final String basePath;
+ private final boolean recursive;
private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
@@ -115,11 +117,13 @@ public class PersistentWatcher implements Closeable
/**
* @param client client
* @param basePath path to set the watch on
+ * @param recursive ZooKeeper persistent watches can optionally be recursive
*/
- public PersistentWatcher(CuratorFramework client, String basePath)
+ public PersistentWatcher(CuratorFramework client, String basePath, boolean recursive)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
+ this.recursive = recursive;
}
/**
@@ -176,7 +180,8 @@ public class PersistentWatcher implements Closeable
{
try
{
- client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
+ AddPersistentWatchBuilder2 builder = recursive ? client.addPersistentWatch().recursive() : client.addPersistentWatch();
+ builder.inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
}
catch ( Exception e )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index b984624..9cbec98 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -23,23 +23,25 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-public class BaseTestTreeCache extends BaseClassForTests
+public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
{
CuratorFramework client;
- TreeCache cache;
+ T cache;
protected final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
private final BlockingQueue<TreeCacheEvent> events = new LinkedBlockingQueue<TreeCacheEvent>();
private final Timing timing = new Timing();
@@ -86,6 +88,26 @@ public class BaseTestTreeCache extends BaseClassForTests
}
/**
+ * Construct a CuratorCache that records exceptions and automatically listens using the bridge.
+ */
+ protected CuratorCache newCacheWithListeners(CuratorFramework client, String path)
+ {
+ CuratorCache result = CuratorCacheBuilder.builder(client, path).build();
+ ListenerBridge.wrap(client, result.getListenable(), eventListener).add();
+ return result;
+ }
+
+ /**
+ * Finish constructing a CuratorCache that records exceptions and automatically listens.
+ */
+ protected CuratorCache buildCacheWithListeners(CuratorCacheBuilder builder)
+ {
+ CuratorCache result = builder.build();
+ ListenerBridge.wrap(client, result.getListenable(), eventListener).add();
+ return result;
+ }
+
+ /**
* Finish constructing a TreeCache that records exceptions and automatically listens.
*/
protected TreeCache buildWithListeners(TreeCache.Builder builder)
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index ebaf43e..ee5e918 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-public class TestTreeCache extends BaseTestTreeCache
+public class TestTreeCache extends BaseTestTreeCache<TreeCache>
{
@Test
public void testSelector() throws Exception
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
new file mode 100644
index 0000000..049daa5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
+import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
+{
+ @Test
+ public void testSelector() throws Exception
+ {
+ client.create().forPath("/root");
+ client.create().forPath("/root/n1-a");
+ client.create().forPath("/root/n1-b");
+ client.create().forPath("/root/n1-b/n2-a");
+ client.create().forPath("/root/n1-b/n2-b");
+ client.create().forPath("/root/n1-b/n2-b/n3-a");
+ client.create().forPath("/root/n1-c");
+ client.create().forPath("/root/n1-d");
+
+ TreeCacheSelector selector = new TreeCacheSelector()
+ {
+ @Override
+ public boolean traverseChildren(String fullPath)
+ {
+ return !fullPath.equals("/root/n1-b/n2-b");
+ }
+
+ @Override
+ public boolean acceptChild(String fullPath)
+ {
+ return !fullPath.equals("/root/n1-c");
+ }
+ };
+ cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+ cache.start();
+
+ assertEvent(Type.NODE_ADDED, "/root");
+ assertEvent(Type.NODE_ADDED, "/root/n1-a");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b");
+ assertEvent(Type.NODE_ADDED, "/root/n1-d");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-a");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-b");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartup() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testStartEmpty() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartEmptyDeeper() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test/foo/bar");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().creatingParentsIfNeeded().forPath("/test/foo");
+ assertNoMoreEvents();
+ client.create().forPath("/test/foo/bar");
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDepth0() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/1"));
+ Assert.assertNull(cache.get("/test/1"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testDepth1() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/1/sub"));
+ Assert.assertNull(cache.get("/test/2/sub"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testDepth1Deeper() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo");
+ client.create().forPath("/test/foo/bar");
+ client.create().forPath("/test/foo/bar/1", "one".getBytes());
+ client.create().forPath("/test/foo/bar/2", "two".getBytes());
+ client.create().forPath("/test/foo/bar/3", "three".getBytes());
+ client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testAsyncInitialPopulation() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testFromRoot() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client, "/");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testFromRootWithDepth() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/one"));
+ Assert.assertNull(cache.get("/test/one"));
+ }
+
+ @Test
+ public void testWithNamespace() throws Exception
+ {
+ client.create().forPath("/outer");
+ client.create().forPath("/outer/foo");
+ client.create().forPath("/outer/test");
+ client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testWithNamespaceAtRoot() throws Exception
+ {
+ client.create().forPath("/outer");
+ client.create().forPath("/outer/foo");
+ client.create().forPath("/outer/test");
+ client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/foo");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
+ Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testSyncInitialPopulation() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testChildrenInitialized() throws Exception
+ {
+ client.create().forPath("/test", "".getBytes());
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+ client.create().forPath("/test/3", "3".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1");
+ assertEvent(Type.NODE_ADDED, "/test/2");
+ assertEvent(Type.NODE_ADDED, "/test/3");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testUpdateWhenNotCachingData() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/foo", "first".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ assertEvent(Type.NODE_UPDATED, "/test/foo");
+ assertNoMoreEvents();
+
+ Assert.assertNotNull(cache.get("/test/foo"));
+ // No byte data querying the tree because we're not caching data.
+ Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+ }
+
+ @Test
+ public void testDeleteThenCreate() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ assertEvent(Type.INITIALIZED);
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo", "one".getBytes());
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo", "two".getBytes());
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDeleteThenCreateRoot() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ cache = newCacheWithListeners(client, "/test/foo");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ assertEvent(Type.INITIALIZED);
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo");
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo");
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testKilledSession() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/foo", "foo".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/me");
+
+ KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+ assertEvent(Type.CONNECTION_LOST);
+ assertEvent(Type.CONNECTION_RECONNECTED);
+ assertEvent(Type.INITIALIZED);
+ assertEvent(Type.NODE_REMOVED, "/test/me", "data".getBytes());
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testBasics() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/t"));
+ Assert.assertNull(cache.get("/testing"));
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/o"));
+ Assert.assertNull(cache.get("/test/onely"));
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ assertEvent(Type.NODE_UPDATED, "/test/one");
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+
+ cache.close();
+ assertNoMoreEvents();
+
+ client.delete().forPath("/test/one");
+ assertNoMoreEvents();
+ }
+
+ /**
+ * Make sure TreeCache gets to a sane state when we can't initially connect to server.
+ */
+ @Test
+ public void testServerNotStartedYet() throws Exception
+ {
+ // Stop the existing server.
+ server.stop();
+
+ // Shutdown the existing client and re-create it started.
+ client.close();
+ initCuratorFramework();
+
+ // Start the client disconnected.
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertNoMoreEvents();
+
+ // Now restart the server.
+ server.restart();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertNoMoreEvents();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
new file mode 100644
index 0000000..f304c24
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.utils.ZKPaths;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache>
+{
+ /**
+ * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
+ */
+ private static final class TestNode
+ {
+ String fullPath;
+ byte[] data;
+ Map<String, TestNode> children = new HashMap<String, TestNode>();
+
+ TestNode(String fullPath, byte[] data)
+ {
+ this.fullPath = fullPath;
+ this.data = data;
+ }
+ }
+
+ // These constants will produce a tree about 10 levels deep.
+ private static final int ITERATIONS = 1000;
+ private static final double DIVE_CHANCE = 0.9;
+ private static final int TEST_DEPTH = 5;
+
+ private final Random random = new Random();
+ private boolean withDepth = false;
+
+ /**
+ * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
+ * a TreeCache to follow the changes. At each step, assert that TreeCache matches our
+ * source-of-truth test data, and that we see exactly the set of events we expect to see.
+ */
+
+ @Test
+ public void testGiantRandomDeepTree() throws Exception {
+ client.create().forPath("/tree", null);
+ CuratorFramework cl = client.usingNamespace("tree");
+ cache = newCacheWithListeners(cl, "/");
+ cache.start();
+ assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
+ assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+ TestNode root = new TestNode("/", new byte[0]);
+ int maxDepth = 0;
+ int adds = 0;
+ int removals = 0;
+ int updates = 0;
+
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ // Select a node to update, randomly navigate down through the tree
+ int depth = 0;
+ TestNode last = null;
+ TestNode node = root;
+ while ( !node.children.isEmpty() && random.nextDouble() < DIVE_CHANCE )
+ {
+ // Go down a level in the tree. Select a random child for the next iteration.
+ last = node;
+ node = Iterables.get(node.children.values(), random.nextInt(node.children.size()));
+ ++depth;
+ }
+ maxDepth = Math.max(depth, maxDepth);
+
+ // Okay we found a node, let's do something interesting with it.
+ switch ( random.nextInt(3) )
+ {
+ case 0:
+ // Try a removal if we have no children and we're not the root node.
+ if ( node != root && node.children.isEmpty() )
+ {
+ // Delete myself from parent.
+ TestNode removed = last.children.remove(ZKPaths.getNodeFromPath(node.fullPath));
+ Assert.assertSame(node, removed);
+
+ // Delete from ZK
+ cl.delete().forPath(node.fullPath);
+
+ // TreeCache should see the delete.
+ if (shouldSeeEventAt(node.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath);
+ }
+ ++removals;
+ }
+ break;
+ case 1:
+ // Do an update.
+ byte[] newData = new byte[10];
+ random.nextBytes(newData);
+
+ if ( Arrays.equals(node.data, newData) )
+ {
+ // Randomly generated the same data! Very small chance, just skip.
+ continue;
+ }
+
+ // Update source-of-truth.
+ node.data = newData;
+
+ // Update in ZK.
+ cl.setData().forPath(node.fullPath, node.data);
+
+ // TreeCache should see the update.
+ if (shouldSeeEventAt(node.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data);
+ }
+
+ ++updates;
+ break;
+ case 2:
+ // Add a new child.
+ String name = Long.toHexString(random.nextLong());
+ if ( node.children.containsKey(name) )
+ {
+ // Randomly generated the same name! Very small chance, just skip.
+ continue;
+ }
+
+ // Add a new child to our test tree.
+ byte[] data = new byte[10];
+ random.nextBytes(data);
+ TestNode child = new TestNode(ZKPaths.makePath(node.fullPath, name), data);
+ node.children.put(name, child);
+
+ // Add to ZK.
+ cl.create().forPath(child.fullPath, child.data);
+
+ // TreeCache should see the add.
+ if (shouldSeeEventAt(child.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data);
+ }
+
+ ++adds;
+ break;
+ }
+
+ // Each iteration, ensure the cached state matches our source-of-truth tree.
+ assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+ assertTreeEquals(cache, root, 0);
+ }
+
+ // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320
+ // We get more adds than removals because removals only happen if we're at a leaf.
+ System.out.println(String.format("maxDepth: %s, adds: %s, removals: %s, updates: %s", maxDepth, adds, removals, updates));
+ assertNoMoreEvents();
+ }
+
+ /**
+ * Returns true we should see an event at this path based on maxDepth, false otherwise.
+ */
+ private boolean shouldSeeEventAt(String fullPath)
+ {
+ return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH;
+ }
+
+ /**
+ * Recursively assert that current children equal expected children.
+ */
+ private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+ {
+ String path = expectedNode.fullPath;
+ Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+ Assert.assertNotNull(cacheChildren, path);
+
+ if (withDepth && depth == TEST_DEPTH) {
+ return;
+ }
+
+ Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path);
+
+ for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
+ {
+ String nodeName = entry.getKey();
+ CachedNode childData = cacheChildren.get(nodeName);
+ TestNode expectedChild = entry.getValue();
+ assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+ assertTreeEquals(cache, expectedChild, depth + 1);
+ }
+ }
+
+ /**
+ * Assert that the given node data matches expected test node data.
+ */
+ private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
+ {
+ String path = expectedNode.fullPath;
+ Assert.assertNotNull(actualChild, path);
+ Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
index 96ce75c..1a9e366 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheRandomTree.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-public class TestTreeCacheRandomTree extends BaseTestTreeCache
+public class TestTreeCacheRandomTree extends BaseTestTreeCache<TreeCache>
{
/**
* A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
http://git-wip-us.apache.org/repos/asf/curator/blob/a83e3e0b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
index e4c3b7e..e884b8c 100644
--- a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -27,16 +27,19 @@ public class WatchersDebug
private static final Method getDataWatches;
private static final Method getExistWatches;
private static final Method getChildWatches;
+ private static final Method getPersistentWatches;
static
{
Method localGetDataWatches = null;
Method localGetExistWatches = null;
Method localGetChildWatches = null;
+ Method localGetPersistentWatches = null;
try
{
localGetDataWatches = getMethod("getDataWatches");
localGetExistWatches = getMethod("getExistWatches");
localGetChildWatches = getMethod("getChildWatches");
+ localGetPersistentWatches = getMethod("getPersistentWatches");
}
catch ( NoSuchMethodException e )
{
@@ -45,6 +48,7 @@ public class WatchersDebug
getDataWatches = localGetDataWatches;
getExistWatches = localGetExistWatches;
getChildWatches = localGetChildWatches;
+ getPersistentWatches = localGetPersistentWatches;
}
public static List<String> getDataWatches(ZooKeeper zooKeeper)
@@ -62,6 +66,11 @@ public class WatchersDebug
return callMethod(zooKeeper, getChildWatches);
}
+ public static List<String> getPersistentWatches(ZooKeeper zooKeeper)
+ {
+ return callMethod(zooKeeper, getPersistentWatches);
+ }
+
private WatchersDebug()
{
}
[7/8] curator git commit: 1. Updates from latest ZK changes. 2. Added
async version for creating persistent watch
Posted by ra...@apache.org.
1. Updates from latest ZK changes. 2. Added async version for creating persistent watch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a27f8768
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a27f8768
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a27f8768
Branch: refs/heads/persistent-watch
Commit: a27f87688c4a3a655d3970cbb5166c6286b5e147
Parents: 9c1186b
Author: randgalt <ra...@apache.org>
Authored: Wed Oct 4 16:01:42 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Oct 4 16:01:42 2017 +0200
----------------------------------------------------------------------
.../async/api/AsyncPersistentWatchBuilder.java | 33 +++++++++
.../AsyncPersistentWatchBuilderImpl.java | 75 ++++++++++++++++++++
2 files changed, 108 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a27f8768/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
new file mode 100644
index 0000000..b794d88
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncPersistentWatchBuilder.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.api;
+
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.x.async.AsyncStage;
+
+public interface AsyncPersistentWatchBuilder extends AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>
+{
+ /**
+ * ZooKeeper persistent watches can optionally be recursive. See
+ * {@link org.apache.zookeeper.ZooKeeper#addPersistentWatch(String, org.apache.zookeeper.Watcher, boolean)}
+ *
+ * @return this
+ */
+ AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/a27f8768/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
new file mode 100644
index 0000000..4f9fd87
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncPersistentWatchBuilderImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.details;
+
+import org.apache.curator.framework.api.AddPersistentWatchable;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.AddPersistentWatchBuilderImpl;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.Watching;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.AsyncPathable;
+import org.apache.curator.x.async.api.AsyncPersistentWatchBuilder;
+import org.apache.zookeeper.Watcher;
+
+import static org.apache.curator.x.async.details.BackgroundProcs.ignoredProc;
+import static org.apache.curator.x.async.details.BackgroundProcs.safeCall;
+
+class AsyncPersistentWatchBuilderImpl implements AsyncPersistentWatchBuilder, AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>>, AsyncPathable<AsyncStage<Void>>
+{
+ private final CuratorFrameworkImpl client;
+ private final Filters filters;
+ private Watching watching = null;
+ private boolean recursive = false;
+
+ AsyncPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Filters filters)
+ {
+ this.client = client;
+ this.filters = filters;
+ }
+
+ @Override
+ public AddPersistentWatchable<AsyncPathable<AsyncStage<Void>>> recursive()
+ {
+ recursive = true;
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> usingWatcher(Watcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AsyncPathable<AsyncStage<Void>> usingWatcher(CuratorWatcher watcher)
+ {
+ watching = new Watching(client, watcher);
+ return this;
+ }
+
+ @Override
+ public AsyncStage<Void> forPath(String path)
+ {
+ BuilderCommon<Void> common = new BuilderCommon<>(filters, ignoredProc);
+ AddPersistentWatchBuilderImpl builder = new AddPersistentWatchBuilderImpl(client, watching, common.backgrounding, recursive);
+ return safeCall(common.internalCallback, () -> builder.forPath(path));
+ }
+}
[2/8] curator git commit: licenses
Posted by ra...@apache.org.
licenses
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bf7c5ecf
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bf7c5ecf
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bf7c5ecf
Branch: refs/heads/persistent-watch
Commit: bf7c5ecf548e18ead3e5bd9b0be05df42367035a
Parents: a83e3e0
Author: randgalt <ra...@apache.org>
Authored: Wed Aug 23 08:18:51 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Aug 23 08:18:51 2017 +0200
----------------------------------------------------------------------
.../framework/recipes/cache/ListenerBridge.java | 18 ++++++++++++++++++
.../framework/recipes/cache/SelectorBridge.java | 18 ++++++++++++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bf7c5ecf/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
index 8a2d665..29e7b6c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.cache;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/curator/blob/bf7c5ecf/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
index 0c6af08..815345c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/SelectorBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.recipes.watch.CacheAction;
[8/8] curator git commit: 1. Updates from latest ZK changes. 2. Added
async version for creating persistent watch
Posted by ra...@apache.org.
1. Updates from latest ZK changes. 2. Added async version for creating persistent watch
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d7bf1a24
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d7bf1a24
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d7bf1a24
Branch: refs/heads/persistent-watch
Commit: d7bf1a2461ecfa0bf708b3890dc4b3a019cacdb0
Parents: a27f876
Author: randgalt <ra...@apache.org>
Authored: Wed Oct 4 16:01:51 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Wed Oct 4 16:01:51 2017 +0200
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 5 +++++
.../imps/AddPersistentWatchBuilderImpl.java | 10 +++++++++-
.../apache/curator/framework/imps/Watching.java | 10 +++++-----
.../framework/recipes/cache/TreeCacheBridge.java | 18 ++++++++++++++++++
.../recipes/cache/TreeCacheBridgeImpl.java | 18 ++++++++++++++++++
.../framework/recipes/watch/CacheSelectors.java | 6 ++----
.../framework/recipes/watch/CachedNode.java | 18 ++++++++++++++++++
.../x/async/api/AsyncCuratorFrameworkDsl.java | 7 +++++++
.../async/details/AsyncCuratorFrameworkImpl.java | 6 ++++++
.../curator/x/async/TestBasicOperations.java | 14 ++++++++++++++
10 files changed, 102 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index ce31d08..f075daa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -193,6 +193,11 @@ public interface CuratorFramework extends Closeable
*/
public SyncBuilder sync();
+ /**
+ * Start a persistent watch builder
+ *
+ * @return builder object
+ */
public AddPersistentWatchBuilder addPersistentWatch();
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
index 56f8f79..4f51f39 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/AddPersistentWatchBuilderImpl.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
+public class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathable<Void>, BackgroundOperation<String>
{
private final CuratorFrameworkImpl client;
private Watching watching = null;
@@ -45,6 +45,14 @@ class AddPersistentWatchBuilderImpl implements AddPersistentWatchBuilder, Pathab
this.client = client;
}
+ public AddPersistentWatchBuilderImpl(CuratorFrameworkImpl client, Watching watching, Backgrounding backgrounding, boolean recursive)
+ {
+ this.client = client;
+ this.watching = watching;
+ this.backgrounding = backgrounding;
+ this.recursive = recursive;
+ }
+
@Override
public AddPersistentWatchable<Pathable<Void>> inBackground()
{
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index daa5dd3..5bad7e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -23,7 +23,7 @@ import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
-class Watching
+public class Watching
{
private final Watcher watcher;
private final CuratorWatcher curatorWatcher;
@@ -31,7 +31,7 @@ class Watching
private final CuratorFrameworkImpl client;
private NamespaceWatcher namespaceWatcher;
- Watching(CuratorFrameworkImpl client, boolean watched)
+ public Watching(CuratorFrameworkImpl client, boolean watched)
{
this.client = client;
this.watcher = null;
@@ -39,7 +39,7 @@ class Watching
this.watched = watched;
}
- Watching(CuratorFrameworkImpl client, Watcher watcher)
+ public Watching(CuratorFrameworkImpl client, Watcher watcher)
{
this.client = client;
this.watcher = watcher;
@@ -47,7 +47,7 @@ class Watching
this.watched = false;
}
- Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
+ public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
{
this.client = client;
this.watcher = null;
@@ -55,7 +55,7 @@ class Watching
this.watched = false;
}
- Watching(CuratorFrameworkImpl client)
+ public Watching(CuratorFrameworkImpl client)
{
this.client = client;
watcher = null;
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
index 8b6f37a..4a0eed9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.listen.Listenable;
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
index 0198aa4..35fcac4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.cache;
import com.google.common.util.concurrent.MoreExecutors;
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index 8814e57..ed6c6fa 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -21,9 +21,8 @@ package org.apache.curator.framework.recipes.watch;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.server.PathIterator;
+import org.apache.zookeeper.server.PathParentIterator;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -315,10 +314,9 @@ public class CacheSelectors
private CacheSelector getSelector(String fullPath)
{
- String parent = ZKPaths.getPathAndNode(fullPath).getPath();
for ( CompositeEntry entry : entries )
{
- PathIterator pathIterator = new PathIterator(fullPath);
+ PathParentIterator pathIterator = PathParentIterator.forAll(fullPath);
while ( pathIterator.hasNext() )
{
if ( pathIterator.next().equals(entry.path) )
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
index b07993f..18131cf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.curator.framework.recipes.watch;
import org.apache.zookeeper.data.Stat;
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
index bc66bb6..c1748d0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncCuratorFrameworkDsl.java
@@ -84,6 +84,13 @@ public interface AsyncCuratorFrameworkDsl extends WatchableAsyncCuratorFramework
AsyncReconfigBuilder reconfig();
/**
+ * Start a persistent watch builder
+ *
+ * @return builder object
+ */
+ AsyncPersistentWatchBuilder addPersistentWatch();
+
+ /**
* Start a transaction builder
*
* @return builder object
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
index 167cf50..afa1de0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java
@@ -124,6 +124,12 @@ public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework
}
@Override
+ public AsyncPersistentWatchBuilderImpl addPersistentWatch()
+ {
+ return new AsyncPersistentWatchBuilderImpl(client, filters);
+ }
+
+ @Override
public AsyncMultiTransaction transaction()
{
return operations -> {
http://git-wip-us.apache.org/repos/asf/curator/blob/d7bf1a24/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index f814146..aed1385 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -32,8 +32,10 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import static java.util.EnumSet.of;
import static org.apache.curator.x.async.api.CreateOption.compress;
@@ -199,4 +201,16 @@ public class TestBasicOperations extends CompletableBaseClassForTests
complete(client.getData().storingStatIn(stat).forPath("/test"));
Assert.assertEquals(stat.getDataLength(), "hey".length());
}
+
+ @Test
+ public void testPersistentRecursiveWatch() throws Exception
+ {
+ BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>();
+ Watcher watcher = event -> events.add(event.getType());
+ complete(client.addPersistentWatch().recursive().usingWatcher(watcher).forPath("/a/b"));
+ client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/c");
+ client.unwrap().create().creatingParentContainersIfNeeded().forPath("/a/b/d");
+ Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+ Assert.assertEquals(timing.takeFromQueue(events), Watcher.Event.EventType.NodeCreated);
+ }
}
[5/8] curator git commit: Abstracted the TreeCache public API and
then an alternate implementation that uses the new CuratorCache instead of
TreeCache. This should make porting older code much easier
Posted by ra...@apache.org.
Abstracted the TreeCache public API and then an alternate implementation that uses the new CuratorCache instead of TreeCache. This should make porting older code much easier
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/02073a71
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/02073a71
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/02073a71
Branch: refs/heads/persistent-watch
Commit: 02073a71a3a165babba9a7db84449ef9e7439a19
Parents: 570023d
Author: randgalt <ra...@apache.org>
Authored: Sun Aug 27 19:03:28 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun Aug 27 19:03:28 2017 +0200
----------------------------------------------------------------------
.../framework/recipes/cache/ListenerBridge.java | 26 +-
.../framework/recipes/cache/TreeCache.java | 88 ++--
.../recipes/cache/TreeCacheBridge.java | 49 ++
.../recipes/cache/TreeCacheBridgeImpl.java | 78 +++
.../framework/recipes/watch/CacheSelectors.java | 1 +
.../framework/recipes/watch/CachedNode.java | 104 +---
.../framework/recipes/watch/CachedNodeImpl.java | 90 ++++
.../framework/recipes/watch/CachedNodeMap.java | 2 +-
.../framework/recipes/watch/CuratorCache.java | 4 +-
.../recipes/watch/CuratorCacheBase.java | 10 +-
.../recipes/watch/CuratorCacheBuilder.java | 20 +-
.../recipes/watch/InternalCuratorCache.java | 23 +-
.../recipes/watch/InternalNodeCache.java | 10 +-
.../recipes/cache/BaseTestTreeCache.java | 20 +
.../recipes/cache/TestTreeCacheBridge.java | 199 +++++---
.../cache/TestTreeCacheBridgeRandomTree.java | 42 +-
.../cache/TestTreeCacheBridgeWrapper.java | 500 +++++++++++++++++++
.../TestTreeCacheBridgeWrapperRandomTree.java | 224 +++++++++
18 files changed, 1242 insertions(+), 248 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
index 29e7b6c..693e1da 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/ListenerBridge.java
@@ -26,8 +26,12 @@ import org.apache.curator.framework.recipes.watch.CacheListener;
import org.apache.curator.framework.recipes.watch.CachedNode;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -186,7 +190,7 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener
*/
public static ChildData toData(String path, CachedNode affectedNode)
{
- if ( (path != null) && (affectedNode != null) && (affectedNode.getData() != null) )
+ if ( (path != null) && (affectedNode != null) )
{
return new ChildData(path, affectedNode.getStat(), affectedNode.getData());
}
@@ -208,6 +212,26 @@ public class ListenerBridge implements CacheListener, ConnectionStateListener
return new TreeCacheEvent(type, data);
}
+ public static Map<String, ChildData> toData(String basePath, Map<String, CachedNode> from)
+ {
+ if ( from.isEmpty() )
+ {
+ return Collections.emptyMap();
+ }
+
+ Map<String, ChildData> mapped = new HashMap<>();
+ for ( Map.Entry<String, CachedNode> entry : from.entrySet() )
+ {
+ String path = entry.getKey();
+ ChildData childData = toData(ZKPaths.makePath(basePath, path), entry.getValue());
+ if ( childData != null )
+ {
+ mapped.put(path, childData);
+ }
+ }
+ return mapped;
+ }
+
protected void handleException(Exception e)
{
log.error("Unhandled exception in listener", e);
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 94e6774..4e663cd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -31,7 +31,11 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.watch.CacheAction;
+import org.apache.curator.framework.recipes.watch.CacheSelector;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
@@ -43,7 +47,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -73,7 +76,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
* @deprecated use {@link CuratorCache}
*/
@Deprecated
-public class TreeCache implements Closeable
+public class TreeCache implements TreeCacheBridge
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
private final boolean createParentNodes;
@@ -109,6 +112,53 @@ public class TreeCache implements Closeable
return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
}
+ public TreeCacheBridge buildBridge()
+ {
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, path);
+ CacheAction cacheAction;
+ if ( cacheData )
+ {
+ cacheAction = dataIsCompressed ? CacheAction.STAT_AND_UNCOMPRESSED_DATA : CacheAction.STAT_AND_DATA;
+ }
+ else
+ {
+ cacheAction = dataIsCompressed ? CacheAction.UNCOMPRESSED_STAT_ONLY : CacheAction.STAT_ONLY;
+ }
+ final CacheSelector maxDepthSelector = (maxDepth != Integer.MAX_VALUE) ? CacheSelectors.maxDepth(maxDepth) : null;
+ final CacheAction sealedCacheAction = cacheAction;
+ CacheSelector cacheSelector = new CacheSelector()
+ {
+ @Override
+ public boolean traverseChildren(String basePath, String fullPath)
+ {
+ //noinspection SimplifiableIfStatement
+ if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) )
+ {
+ return false;
+ }
+ return selector.traverseChildren(fullPath);
+ }
+
+ @Override
+ public CacheAction actionForPath(String basePath, String fullPath)
+ {
+ if ( (maxDepthSelector != null) && !maxDepthSelector.traverseChildren(basePath, fullPath) )
+ {
+ return CacheAction.NOT_STORED;
+ }
+ return selector.acceptChild(fullPath) ? sealedCacheAction : CacheAction.NOT_STORED;
+ }
+ };
+ builder = builder.withCacheSelector(cacheSelector).withDefaultData(null);
+
+ if ( createParentNodes )
+ {
+ LOG.warn("setCreateParentNodes(true) is not supported by TreeCacheBridge. Use EnsureContainers or MigrationManager instead");
+ }
+
+ return new TreeCacheBridgeImpl(client, builder.build());
+ }
+
/**
* Sets whether or not to cache byte data per node; default {@code true}.
*/
@@ -579,12 +629,7 @@ public class TreeCache implements Closeable
this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
}
- /**
- * Start the cache. The cache is not started automatically. You must call this method.
- *
- * @return this
- * @throws Exception errors
- */
+ @Override
public TreeCache start() throws Exception
{
Preconditions.checkState(treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED), "already started");
@@ -600,9 +645,6 @@ public class TreeCache implements Closeable
return this;
}
- /**
- * Close/end the cache.
- */
@Override
public void close()
{
@@ -624,11 +666,7 @@ public class TreeCache implements Closeable
}
}
- /**
- * Return the cache listenable
- *
- * @return listenable
- */
+ @Override
public Listenable<TreeCacheListener> getListenable()
{
return listeners;
@@ -680,14 +718,7 @@ public class TreeCache implements Closeable
return current;
}
- /**
- * Return the current set of children at the given path, mapped by child name. There are no
- * guarantees of accuracy; this is merely the most recent view of the data. If there is no
- * node at this path, {@code null} is returned.
- *
- * @param fullPath full path to the node to check
- * @return a possibly-empty list of children if the node is alive, or null
- */
+ @Override
public Map<String, ChildData> getCurrentChildren(String fullPath)
{
TreeNode node = find(fullPath);
@@ -721,14 +752,7 @@ public class TreeCache implements Closeable
return node.nodeState == NodeState.LIVE ? result : null;
}
- /**
- * Return the current data for the given path. There are no guarantees of accuracy. This is
- * merely the most recent view of the data. If there is no node at the given path,
- * {@code null} is returned.
- *
- * @param fullPath full path to the node to check
- * @return data if the node is alive, or null
- */
+ @Override
public ChildData getCurrentData(String fullPath)
{
TreeNode node = find(fullPath);
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
new file mode 100644
index 0000000..9c383af
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -0,0 +1,49 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.Map;
+
+public interface TreeCacheBridge extends Closeable
+{
+ /**
+ * Start the cache. The cache is not started automatically. You must call this method.
+ *
+ * @return this
+ * @throws Exception errors
+ */
+ TreeCacheBridge start() throws Exception;
+
+ /**
+ * Close/end the cache.
+ */
+ @Override
+ void close();
+
+ /**
+ * Return the cache listenable
+ *
+ * @return listenable
+ */
+ Listenable<TreeCacheListener> getListenable();
+
+ /**
+ * Return the current set of children at the given path, mapped by child name. There are no
+ * guarantees of accuracy; this is merely the most recent view of the data. If there is no
+ * node at this path, {@code null} is returned.
+ *
+ * @param fullPath full path to the node to check
+ * @return a possibly-empty list of children if the node is alive, or null
+ */
+ Map<String, ChildData> getCurrentChildren(String fullPath);
+
+ /**
+ * Return the current data for the given path. There are no guarantees of accuracy. This is
+ * merely the most recent view of the data. If there is no node at the given path,
+ * {@code null} is returned.
+ *
+ * @param fullPath full path to the node to check
+ * @return data if the node is alive, or null
+ */
+ ChildData getCurrentData(String fullPath);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
new file mode 100644
index 0000000..0198aa4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridgeImpl.java
@@ -0,0 +1,78 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+public class TreeCacheBridgeImpl implements TreeCacheBridge, Listenable<TreeCacheListener>
+{
+ private final CuratorFramework client;
+ private final CuratorCache cache;
+ private final Map<TreeCacheListener, ListenerBridge> listenerMap = new ConcurrentHashMap<>();
+
+ public TreeCacheBridgeImpl(CuratorFramework client, CuratorCache cache)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.cache = Objects.requireNonNull(cache, "cache cannot be null");
+ }
+
+ @Override
+ public TreeCacheBridge start()
+ {
+ cache.start();
+ return this;
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+
+ @Override
+ public Listenable<TreeCacheListener> getListenable()
+ {
+ return this;
+ }
+
+ @Override
+ public Map<String, ChildData> getCurrentChildren(String fullPath)
+ {
+ return ListenerBridge.toData(fullPath, cache.childrenAtPath(fullPath));
+ }
+
+ @Override
+ public ChildData getCurrentData(String fullPath)
+ {
+ return ListenerBridge.toData(fullPath, cache.get(fullPath));
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener)
+ {
+ addListener(listener, MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public void addListener(TreeCacheListener listener, Executor executor)
+ {
+ ListenerBridge listenerBridge = ListenerBridge.wrap(client, cache.getListenable(), listener);
+ listenerBridge.add();
+ listenerMap.put(listener, listenerBridge);
+ }
+
+ @Override
+ public void removeListener(TreeCacheListener listener)
+ {
+ ListenerBridge listenerBridge = listenerMap.remove(listener);
+ if ( listenerBridge != null )
+ {
+ listenerBridge.remove();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
index eaf1145..8814e57 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheSelectors.java
@@ -23,6 +23,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.server.PathIterator;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
index f3cd18a..b07993f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNode.java
@@ -1,108 +1,10 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
package org.apache.curator.framework.recipes.watch;
import org.apache.zookeeper.data.Stat;
-import java.util.Arrays;
-import java.util.Objects;
-/**
- * Represents the data for a cached node
- */
-public class CachedNode
+public interface CachedNode
{
- private final Stat stat;
- private final byte[] data;
+ Stat getStat();
- private static final byte[] defaultData = new byte[0];
-
- /**
- * Creates an empty node
- */
- public CachedNode()
- {
- this(new Stat(), defaultData);
- }
-
- /**
- * A node with a stat but empty data
- *
- * @param stat the stat
- */
- public CachedNode(Stat stat)
- {
- this(stat, defaultData);
- }
-
- /**
- * @param stat the stat
- * @param data uncompressed data. If <code>null</code> an empty array is substituted.
- */
- public CachedNode(Stat stat, byte[] data)
- {
- this.stat = Objects.requireNonNull(stat, "stat cannot be null");
- this.data = (data != null) ? data : defaultData;
- }
-
- public Stat getStat()
- {
- return stat;
- }
-
- public byte[] getData()
- {
- return data;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if ( this == o )
- {
- return true;
- }
- if ( o == null || getClass() != o.getClass() )
- {
- return false;
- }
-
- CachedNode that = (CachedNode)o;
-
- //noinspection SimplifiableIfStatement
- if ( !stat.equals(that.stat) )
- {
- return false;
- }
- return Arrays.equals(data, that.data);
- }
-
- @Override
- public int hashCode()
- {
- int result = stat.hashCode();
- result = 31 * result + Arrays.hashCode(data);
- return result;
- }
-
- @Override
- public String toString()
- {
- return "CachedNode{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
- }
+ byte[] getData();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
new file mode 100644
index 0000000..91b6a5e
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.zookeeper.data.Stat;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Represents the data for a cached node
+ */
+public class CachedNodeImpl implements CachedNode
+{
+ private final Stat stat;
+ private final byte[] data;
+
+ /**
+ * @param stat the stat
+ * @param data uncompressed data or null - NOTE: ownership is taken of the given data object
+ */
+ public CachedNodeImpl(Stat stat, byte[] data)
+ {
+ this.stat = Objects.requireNonNull(stat, "stat cannot be null");
+ this.data = data;
+ }
+
+ @Override
+ public Stat getStat()
+ {
+ return stat;
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ CachedNodeImpl that = (CachedNodeImpl)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( !stat.equals(that.stat) )
+ {
+ return false;
+ }
+ return Arrays.equals(data, that.data);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = stat.hashCode();
+ result = 31 * result + Arrays.hashCode(data);
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CachedNodeImpl{" + "stat=" + stat + ", data=" + Arrays.toString(data) + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
index d59ffae..adcd458 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CachedNodeMap.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.Map;
/**
- * Interface for a container of path to {@link org.apache.curator.framework.recipes.watch.CachedNode}
+ * Interface for a container of path to {@link CachedNodeImpl}
*/
public interface CachedNodeMap
{
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index e97157e..2742000 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -150,7 +150,7 @@ public interface CuratorCache extends Closeable
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
*
* @param path the path of the node to clear
*/
@@ -158,7 +158,7 @@ public interface CuratorCache extends Closeable
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
*
* @param path the path of the node to clear
* @param ifVersion if non-negative, only clear the data if the data's version matches this version
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 1aa5b5d..c072f7c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
abstract class CuratorCacheBase implements CuratorCache
{
protected final CachedNodeMap cache;
+ protected final byte[] defaultData;
private final String mainPath;
private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -54,11 +55,12 @@ abstract class CuratorCacheBase implements CuratorCache
CLOSED
}
- protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents)
+ protected CuratorCacheBase(String mainPath, CachedNodeMap cache, boolean sendRefreshEvents, byte[] defaultData)
{
this.mainPath = Objects.requireNonNull(mainPath, "mainPath cannot be null");
this.cache = Objects.requireNonNull(cache, "cache cannot be null");
this.sendRefreshEvents = sendRefreshEvents;
+ this.defaultData = defaultData;
}
@Override
@@ -140,7 +142,7 @@ abstract class CuratorCacheBase implements CuratorCache
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
*
* @param path the path of the node to clear
*/
@@ -152,7 +154,7 @@ abstract class CuratorCacheBase implements CuratorCache
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
- * calls to {@link CachedNode#getData()} for this node will return <code>null</code>.
+ * calls to {@link CachedNodeImpl#getData()} for this node will return <code>null</code>.
*
* @param path the path of the node to clear
* @param ifVersion if non-negative, only clear the data if the data's version matches this version
@@ -166,7 +168,7 @@ abstract class CuratorCacheBase implements CuratorCache
{
if ( (ifVersion < 0) || ((data.getStat() != null) && (ifVersion == data.getStat().getVersion())) )
{
- return cache.replace(path, data, new CachedNode(data.getStat()));
+ return cache.replace(path, data, new CachedNodeImpl(data.getStat(), defaultData));
}
}
return false;
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 10f8bc2..eb8d0df 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.watch;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -40,6 +41,7 @@ public class CuratorCacheBuilder
private boolean usingSoftValues = false;
private Long expiresAfterWriteMs = null;
private Long expiresAfterAccessMs = null;
+ private byte[] defaultData = new byte[0];
/**
* Start a new builder for the given client and main path
@@ -66,10 +68,10 @@ public class CuratorCacheBuilder
{
Preconditions.checkState(cacheSelector == null, "Single node mode does not support CacheSelectors");
Preconditions.checkState(singleNodeCacheAction != CacheAction.UNCOMPRESSED_STAT_ONLY, "Single node mode does not support UNCOMPRESSED_STAT_ONLY");
- return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart);
+ return new InternalNodeCache(client, path, singleNodeCacheAction, cachedNodeMap, sendRefreshEvents, refreshOnStart, defaultData);
}
- return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren);
+ return new InternalCuratorCache(client, path, cacheSelector, cachedNodeMap, sendRefreshEvents, refreshOnStart, sortChildren, defaultData);
}
/**
@@ -215,6 +217,20 @@ public class CuratorCacheBuilder
return this;
}
+ /**
+ * If a node does not contain data (due to the CacheSelector or other reason), the default is
+ * to set {@link CachedNodeImpl#getData()} to an empty byte array (<code>byte[0]</code>). Use this
+ * method to change to another value or pass <code>null</code>
+ *
+ * @param data data bytes or null
+ * @return this
+ */
+ public CuratorCacheBuilder withDefaultData(byte[] data)
+ {
+ this.defaultData = (data != null) ? Arrays.copyOf(data, data.length) : null;
+ return this;
+ }
+
private CuratorCacheBuilder(CuratorFramework client, String path)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index edd08b5..4b88bf4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.watch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -30,6 +29,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
@@ -38,11 +38,10 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
-import java.util.concurrent.atomic.AtomicInteger;
class InternalCuratorCache extends CuratorCacheBase implements Watcher
{
- private static final CachedNode nullNode = new CachedNode();
+ private final CachedNode nullNode;
private final Logger log = LoggerFactory.getLogger(getClass());
private final PersistentWatcher watcher;
private final CuratorFramework client;
@@ -64,9 +63,9 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
};
- InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren)
+ InternalCuratorCache(CuratorFramework client, String path, final CacheSelector cacheSelector, CachedNodeMap cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren, byte[] defaultData)
{
- super(path, cache, sendRefreshEvents);
+ super(path, cache, sendRefreshEvents, defaultData);
this.client = Objects.requireNonNull(client, "client cannot be null");
this.basePath = Objects.requireNonNull(path, "path cannot be null");
this.cacheSelector = Objects.requireNonNull(cacheSelector, "cacheSelector cannot be null");
@@ -84,6 +83,8 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
};
watcher.getListenable().addListener(this);
+
+ nullNode = new CachedNodeImpl(new Stat(), defaultData);
}
@Override
@@ -163,7 +164,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
if ( event.getType() == CuratorEventType.GET_DATA )
{
CacheAction cacheAction = (CacheAction)event.getContext();
- CachedNode newNode = new CachedNode(event.getStat(), event.getData());
+ CachedNode newNode = new CachedNodeImpl(event.getStat(), event.getData());
CachedNode oldNode = putNewNode(path, cacheAction, newNode);
if ( oldNode == null )
{
@@ -292,7 +293,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
case STAT_ONLY:
case UNCOMPRESSED_STAT_ONLY:
{
- putNode = new CachedNode(newNode.getStat());
+ putNode = new CachedNodeImpl(newNode.getStat(), defaultData);
break;
}
@@ -306,14 +307,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
return cache.put(path, putNode);
}
- private void decrementOutstanding(SettableFuture<Boolean> task, AtomicInteger outstandingCount)
- {
- if ( outstandingCount.decrementAndGet() <= 0 )
- {
- task.set(true);
- }
- }
-
private void remove(String path)
{
CachedNode removed = cache.remove(path);
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index dd880f6..5f8d750 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -31,6 +31,7 @@ import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
@@ -44,7 +45,7 @@ class InternalNodeCache extends CuratorCacheBase
private final String path;
private final CacheAction cacheAction;
private final AtomicBoolean isConnected = new AtomicBoolean(true);
- private static final CachedNode nullNode = new CachedNode();
+ private final CachedNode nullNode;
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
@@ -90,12 +91,13 @@ class InternalNodeCache extends CuratorCacheBase
}
};
- InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart)
+ InternalNodeCache(CuratorFramework client, String path, CacheAction cacheAction, CachedNodeMap cache, boolean sendRefreshEvents, boolean refreshOnStart, byte[] defaultData)
{
- super(path, cache, sendRefreshEvents);
+ super(path, cache, sendRefreshEvents, defaultData);
this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.cacheAction = cacheAction;
+ nullNode = new CachedNodeImpl(new Stat(), defaultData);
Preconditions.checkArgument(refreshOnStart, "refreshingWhenStarted() must be true when forSingleNode() is used");
}
@@ -156,7 +158,7 @@ class InternalNodeCache extends CuratorCacheBase
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
- CachedNode cachedNode = new CachedNode(event.getStat(), event.getData());
+ CachedNode cachedNode = new CachedNodeImpl(event.getStat(), event.getData());
setNewData(cachedNode);
}
break;
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index 9cbec98..7f1f547 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -88,6 +88,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
}
/**
+ * Construct a TreeCache that records exceptions and automatically listens.
+ */
+ protected TreeCacheBridge newTreeCacheBridgeWithListeners(CuratorFramework client, String path)
+ {
+ TreeCacheBridge result = TreeCache.newBuilder(client, path).buildBridge();
+ result.getListenable().addListener(eventListener);
+ return result;
+ }
+
+ /**
* Construct a CuratorCache that records exceptions and automatically listens using the bridge.
*/
protected CuratorCache newCacheWithListeners(CuratorFramework client, String path)
@@ -118,6 +128,16 @@ public class BaseTestTreeCache<T extends Closeable> extends BaseClassForTests
return result;
}
+ /**
+ * Finish constructing a TreeCache that records exceptions and automatically listens.
+ */
+ protected TreeCacheBridge buildBridgeWithListeners(TreeCache.Builder builder)
+ {
+ TreeCacheBridge result = builder.buildBridge();
+ result.getListenable().addListener(eventListener);
+ return result;
+ }
+
@Override
@BeforeMethod
public void setup() throws Exception
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
index 049daa5..614fa56 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridge.java
@@ -20,16 +20,17 @@
package org.apache.curator.framework.recipes.cache;
import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.recipes.watch.CacheSelectors;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
-import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.concurrent.Semaphore;
-public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
+@SuppressWarnings("deprecation")
+public class TestTreeCacheBridge extends BaseTestTreeCache<TreeCacheBridge>
{
@Test
public void testSelector() throws Exception
@@ -57,7 +58,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
return !fullPath.equals("/root/n1-c");
}
};
- cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/root").setSelector(selector));
cache.start();
assertEvent(Type.NODE_ADDED, "/root");
@@ -79,7 +80,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test/3", "three".getBytes());
client.create().forPath("/test/2/sub", "two-sub".getBytes());
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
@@ -89,16 +90,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
- Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
- Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
- Assert.assertNull(cache.get("/test/non_exist"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of("sub"));
+ Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty());
}
@Test
public void testStartEmpty() throws Exception
{
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.INITIALIZED);
@@ -110,7 +111,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
@Test
public void testStartEmptyDeeper() throws Exception
{
- cache = newCacheWithListeners(client, "/test/foo/bar");
+ cache = newTreeCacheBridgeWithListeners(client, "/test/foo/bar");
cache.start();
assertEvent(Type.INITIALIZED);
@@ -130,17 +131,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test/3", "three".getBytes());
client.create().forPath("/test/2/sub", "two-sub".getBytes());
- CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
- cache = buildCacheWithListeners(builder);
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(0));
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
- Assert.assertNull(cache.get("/test/1"));
- Assert.assertNull(cache.get("/test/1"));
- Assert.assertNull(cache.get("/test/non_exist"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.getCurrentData("/test/1"));
+ Assert.assertTrue(cache.getCurrentChildren("/test/1").isEmpty());
+ Assert.assertNull(cache.getCurrentData("/test/non_exist"));
}
@Test
@@ -152,8 +152,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test/3", "three".getBytes());
client.create().forPath("/test/2/sub", "two-sub".getBytes());
- CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
- cache = buildCacheWithListeners(builder);
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(1));
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
@@ -162,12 +161,12 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
- Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
- Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
- Assert.assertNull(cache.get("/test/1/sub"));
- Assert.assertNull(cache.get("/test/2/sub"));
- Assert.assertNull(cache.get("/test/non_exist"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.getCurrentData("/test/2/sub"));
+ Assert.assertTrue(cache.getCurrentChildren("/test/2/sub").isEmpty());
+ Assert.assertTrue(cache.getCurrentChildren("/test/non_exist").isEmpty());
}
@Test
@@ -181,8 +180,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test/foo/bar/3", "three".getBytes());
client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
- CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
- cache = buildCacheWithListeners(builder);
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test/foo/bar").setMaxDepth(1));
cache.start();
assertEvent(Type.NODE_ADDED, "/test/foo/bar");
assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
@@ -198,7 +196,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test");
client.create().forPath("/test/one", "hey there".getBytes());
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/one");
@@ -212,7 +210,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test");
client.create().forPath("/test/one", "hey there".getBytes());
- cache = newCacheWithListeners(client, "/");
+ cache = newTreeCacheBridgeWithListeners(client, "/");
cache.start();
assertEvent(Type.NODE_ADDED, "/");
assertEvent(Type.NODE_ADDED, "/test");
@@ -220,10 +218,10 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
- Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
}
@Test
@@ -232,18 +230,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test");
client.create().forPath("/test/one", "hey there".getBytes());
- CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
- cache = buildCacheWithListeners(builder);
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/").setMaxDepth(1));
cache.start();
assertEvent(Type.NODE_ADDED, "/");
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
- Assert.assertNull(cache.get("/test/one"));
- Assert.assertNull(cache.get("/test/one"));
+ Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.getCurrentData("/test/one"));
+ Assert.assertTrue(cache.getCurrentChildren("/test/one").isEmpty());
}
@Test
@@ -254,16 +251,16 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/outer/test");
client.create().forPath("/outer/test/one", "hey there".getBytes());
- cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+ cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/one");
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
- Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
}
@Test
@@ -274,7 +271,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/outer/test");
client.create().forPath("/outer/test/one", "hey there".getBytes());
- cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+ cache = newTreeCacheBridgeWithListeners(client.usingNamespace("outer"), "/");
cache.start();
assertEvent(Type.NODE_ADDED, "/");
assertEvent(Type.NODE_ADDED, "/foo");
@@ -282,17 +279,17 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
assertEvent(Type.NODE_ADDED, "/test/one");
assertEvent(Type.INITIALIZED);
assertNoMoreEvents();
- Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
- Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
- Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertEquals(cache.getCurrentChildren("/").keySet(), ImmutableSet.of("foo", "test"));
+ Assert.assertEquals(cache.getCurrentChildren("/foo").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
}
@Test
public void testSyncInitialPopulation() throws Exception
{
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.INITIALIZED);
@@ -311,7 +308,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test/2", "2".getBytes());
client.create().forPath("/test/3", "3".getBytes());
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/1");
@@ -326,7 +323,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
{
client.create().forPath("/test");
- cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(client, "/test").setCacheData(false));
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
@@ -338,9 +335,9 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
assertEvent(Type.NODE_UPDATED, "/test/foo");
assertNoMoreEvents();
- Assert.assertNotNull(cache.get("/test/foo"));
+ Assert.assertNotNull(cache.getCurrentData("/test/foo"));
// No byte data querying the tree because we're not caching data.
- Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+ Assert.assertNull(cache.getCurrentData("/test/foo").getData());
}
@Test
@@ -349,7 +346,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test");
client.create().forPath("/test/foo", "one".getBytes());
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.NODE_ADDED, "/test/foo");
@@ -374,7 +371,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
client.create().forPath("/test");
client.create().forPath("/test/foo", "one".getBytes());
- cache = newCacheWithListeners(client, "/test/foo");
+ cache = newTreeCacheBridgeWithListeners(client, "/test/foo");
cache.start();
assertEvent(Type.NODE_ADDED, "/test/foo");
assertEvent(Type.INITIALIZED);
@@ -397,7 +394,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
{
client.create().forPath("/test");
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
@@ -421,47 +418,103 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
{
client.create().forPath("/test");
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
- Assert.assertNull(cache.get("/t"));
- Assert.assertNull(cache.get("/testing"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+ Assert.assertTrue(cache.getCurrentChildren("/t").isEmpty());
+ Assert.assertTrue(cache.getCurrentChildren("/testing").isEmpty());
client.create().forPath("/test/one", "hey there".getBytes());
assertEvent(Type.NODE_ADDED, "/test/one");
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
- Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
- Assert.assertNull(cache.get("/test/o"));
- Assert.assertNull(cache.get("/test/onely"));
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+ Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertTrue(cache.getCurrentChildren("/test/o").isEmpty());
+ Assert.assertTrue(cache.getCurrentChildren("/test/onely").isEmpty());
client.setData().forPath("/test/one", "sup!".getBytes());
assertEvent(Type.NODE_UPDATED, "/test/one");
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
client.delete().forPath("/test/one");
assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
- Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
assertNoMoreEvents();
}
@Test
+ public void testBasicsOnTwoCaches() throws Exception
+ {
+ TreeCacheBridge cache2 = newTreeCacheBridgeWithListeners(client, "/test");
+ cache2.getListenable().removeListener(eventListener); // Don't listen on the second cache.
+
+ // Just ensures the same event count; enables test flow control on cache2.
+ final Semaphore semaphore = new Semaphore(0);
+ cache2.getListenable().addListener(new TreeCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+ {
+ semaphore.release();
+ }
+ });
+
+ try
+ {
+ client.create().forPath("/test");
+
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
+ cache.start();
+ cache2.start();
+
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ semaphore.acquire(2);
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+ semaphore.acquire();
+ Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "hey there");
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ assertEvent(Type.NODE_UPDATED, "/test/one");
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+ semaphore.acquire();
+ Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+ Assert.assertNull(cache.getCurrentData("/test/one"));
+ semaphore.acquire();
+ Assert.assertNull(cache2.getCurrentData("/test/one"));
+
+ assertNoMoreEvents();
+ Assert.assertEquals(semaphore.availablePermits(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache2);
+ }
+ }
+
+ @Test
public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
{
client.create().forPath("/test");
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertEvent(Type.NODE_ADDED, "/test");
assertEvent(Type.INITIALIZED);
client.create().forPath("/test/one", "hey there".getBytes());
assertEvent(Type.NODE_ADDED, "/test/one");
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
cache.close();
assertNoMoreEvents();
@@ -484,7 +537,7 @@ public class TestTreeCacheBridge extends BaseTestTreeCache<CuratorCache>
initCuratorFramework();
// Start the client disconnected.
- cache = newCacheWithListeners(client, "/test");
+ cache = newTreeCacheBridgeWithListeners(client, "/test");
cache.start();
assertNoMoreEvents();
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
index f304c24..140ee64 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeRandomTree.java
@@ -21,8 +21,6 @@ package org.apache.curator.framework.recipes.cache;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.watch.CachedNode;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
import org.apache.curator.utils.ZKPaths;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -31,7 +29,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCache>
+@SuppressWarnings("deprecation")
+public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<TreeCacheBridge>
{
/**
* A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
@@ -57,22 +56,39 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
private final Random random = new Random();
private boolean withDepth = false;
+ @Test
+ public void testGiantRandomDeepTree() throws Exception {
+ doTestGiantRandomDeepTree();
+ }
+
+ @Test
+ public void testGiantRandomDeepTreeWithDepth() throws Exception {
+ withDepth = true;
+ doTestGiantRandomDeepTree();
+ }
+
/**
* Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
* a TreeCache to follow the changes. At each step, assert that TreeCache matches our
* source-of-truth test data, and that we see exactly the set of events we expect to see.
*/
-
- @Test
- public void testGiantRandomDeepTree() throws Exception {
+ private void doTestGiantRandomDeepTree() throws Exception
+ {
client.create().forPath("/tree", null);
CuratorFramework cl = client.usingNamespace("tree");
- cache = newCacheWithListeners(cl, "/");
+ if ( withDepth )
+ {
+ cache = buildBridgeWithListeners(TreeCache.newBuilder(cl, "/").setMaxDepth(TEST_DEPTH));
+ }
+ else
+ {
+ cache = newTreeCacheBridgeWithListeners(cl, "/");
+ }
cache.start();
assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
assertEvent(TreeCacheEvent.Type.INITIALIZED);
- TestNode root = new TestNode("/", new byte[0]);
+ TestNode root = new TestNode("/", null);
int maxDepth = 0;
int adds = 0;
int removals = 0;
@@ -169,7 +185,7 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
}
// Each iteration, ensure the cached state matches our source-of-truth tree.
- assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+ assertNodeEquals(cache.getCurrentData("/"), root);
assertTreeEquals(cache, root, 0);
}
@@ -190,10 +206,10 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
/**
* Recursively assert that current children equal expected children.
*/
- private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+ private void assertTreeEquals(TreeCacheBridge cache, TestNode expectedNode, int depth)
{
String path = expectedNode.fullPath;
- Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+ Map<String, ChildData> cacheChildren = cache.getCurrentChildren(path);
Assert.assertNotNull(cacheChildren, path);
if (withDepth && depth == TEST_DEPTH) {
@@ -205,9 +221,9 @@ public class TestTreeCacheBridgeRandomTree extends BaseTestTreeCache<CuratorCach
for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
{
String nodeName = entry.getKey();
- CachedNode childData = cacheChildren.get(nodeName);
+ ChildData childData = cacheChildren.get(nodeName);
TestNode expectedChild = entry.getValue();
- assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+ assertNodeEquals(childData, expectedChild);
assertTreeEquals(cache, expectedChild, depth + 1);
}
}
[3/8] curator git commit: typo - both cache selectors were compressed
Posted by ra...@apache.org.
typo - both cache selectors were compressed
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/570023df
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/570023df
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/570023df
Branch: refs/heads/persistent-watch
Commit: 570023df19106f5ce6895490fe32cbf8be38ac5e
Parents: bf7c5ec
Author: randgalt <ra...@apache.org>
Authored: Thu Aug 24 06:22:07 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Thu Aug 24 06:22:07 2017 +0200
----------------------------------------------------------------------
.../apache/curator/x/async/modeled/details/ModeledCacheImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/570023df/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index ce73a9b..2684633 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -74,7 +74,7 @@ class ModeledCacheImpl<T> implements CacheListener, ModeledCache<T>
this.serializer = modelSpec.serializer();
boolean dataIsCompressed = modelSpec.createOptions().contains(CreateOption.compress);
cache = CuratorCacheBuilder.builder(client, basePath.fullPath())
- .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.getUncompressedStatOnly())
+ .withCacheSelector(dataIsCompressed ? CacheSelectors.uncompressedStatAndData() : CacheSelectors.statAndData())
.sendingRefreshEvents(true)
.build();
}
[4/8] curator git commit: Abstracted the TreeCache public API and
then an alternate implementation that uses the new CuratorCache instead of
TreeCache. This should make porting older code much easier
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
new file mode 100644
index 0000000..cfb897d
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapper.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.recipes.watch.CacheSelectors;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
+import org.apache.curator.test.compatibility.KillSession2;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCacheBridgeWrapper extends BaseTestTreeCache<CuratorCache>
+{
+ @Test
+ public void testSelector() throws Exception
+ {
+ client.create().forPath("/root");
+ client.create().forPath("/root/n1-a");
+ client.create().forPath("/root/n1-b");
+ client.create().forPath("/root/n1-b/n2-a");
+ client.create().forPath("/root/n1-b/n2-b");
+ client.create().forPath("/root/n1-b/n2-b/n3-a");
+ client.create().forPath("/root/n1-c");
+ client.create().forPath("/root/n1-d");
+
+ TreeCacheSelector selector = new TreeCacheSelector()
+ {
+ @Override
+ public boolean traverseChildren(String fullPath)
+ {
+ return !fullPath.equals("/root/n1-b/n2-b");
+ }
+
+ @Override
+ public boolean acceptChild(String fullPath)
+ {
+ return !fullPath.equals("/root/n1-c");
+ }
+ };
+ cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheSelector(SelectorBridge.wrap(selector)));
+ cache.start();
+
+ assertEvent(Type.NODE_ADDED, "/root");
+ assertEvent(Type.NODE_ADDED, "/root/n1-a");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b");
+ assertEvent(Type.NODE_ADDED, "/root/n1-d");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-a");
+ assertEvent(Type.NODE_ADDED, "/root/n1-b/n2-b");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartup() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of("sub"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testStartEmpty() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testStartEmptyDeeper() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test/foo/bar");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().creatingParentsIfNeeded().forPath("/test/foo");
+ assertNoMoreEvents();
+ client.create().forPath("/test/foo/bar");
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDepth0() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(0));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/1"));
+ Assert.assertNull(cache.get("/test/1"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testDepth1() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/1", "one".getBytes());
+ client.create().forPath("/test/2", "two".getBytes());
+ client.create().forPath("/test/3", "three".getBytes());
+ client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/3", "three".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("1", "2", "3"));
+ Assert.assertEquals(cache.childrenAtPath("/test/1").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test/2").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/1/sub"));
+ Assert.assertNull(cache.get("/test/2/sub"));
+ Assert.assertNull(cache.get("/test/non_exist"));
+ }
+
+ @Test
+ public void testDepth1Deeper() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo");
+ client.create().forPath("/test/foo/bar");
+ client.create().forPath("/test/foo/bar/1", "one".getBytes());
+ client.create().forPath("/test/foo/bar/2", "two".getBytes());
+ client.create().forPath("/test/foo/bar/3", "three".getBytes());
+ client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/test/foo/bar").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar");
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes());
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testAsyncInitialPopulation() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testFromRoot() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client, "/");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testFromRootWithDepth() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, "/").withCacheSelector(CacheSelectors.maxDepth(1));
+ cache = buildCacheWithListeners(builder);
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertTrue(cache.childrenAtPath("/").keySet().contains("test"));
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/one"));
+ Assert.assertNull(cache.get("/test/one"));
+ }
+
+ @Test
+ public void testWithNamespace() throws Exception
+ {
+ client.create().forPath("/outer");
+ client.create().forPath("/outer/foo");
+ client.create().forPath("/outer/test");
+ client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client.usingNamespace("outer"), "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testWithNamespaceAtRoot() throws Exception
+ {
+ client.create().forPath("/outer");
+ client.create().forPath("/outer/foo");
+ client.create().forPath("/outer/test");
+ client.create().forPath("/outer/test/one", "hey there".getBytes());
+
+ cache = newCacheWithListeners(client.usingNamespace("outer"), "/");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/");
+ assertEvent(Type.NODE_ADDED, "/foo");
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ Assert.assertEquals(cache.childrenAtPath("/").keySet(), ImmutableSet.of("foo", "test"));
+ Assert.assertEquals(cache.childrenAtPath("/foo").keySet(), ImmutableSet.of());
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ }
+
+ @Test
+ public void testSyncInitialPopulation() throws Exception
+ {
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testChildrenInitialized() throws Exception
+ {
+ client.create().forPath("/test", "".getBytes());
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+ client.create().forPath("/test/3", "3".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/1");
+ assertEvent(Type.NODE_ADDED, "/test/2");
+ assertEvent(Type.NODE_ADDED, "/test/3");
+ assertEvent(Type.INITIALIZED);
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testUpdateWhenNotCachingData() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = buildCacheWithListeners(CuratorCacheBuilder.builder(client, "/test").withCacheSelector(CacheSelectors.statOnly()));
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/foo", "first".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ assertEvent(Type.NODE_UPDATED, "/test/foo");
+ assertNoMoreEvents();
+
+ Assert.assertNotNull(cache.get("/test/foo"));
+ // No byte data querying the tree because we're not caching data.
+ Assert.assertEquals(cache.get("/test/foo").getData().length, 0);
+ }
+
+ @Test
+ public void testDeleteThenCreate() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ assertEvent(Type.INITIALIZED);
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo", "one".getBytes());
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo", "two".getBytes());
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDeleteThenCreateRoot() throws Exception
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ cache = newCacheWithListeners(client, "/test/foo");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ assertEvent(Type.INITIALIZED);
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo");
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ client.delete().forPath("/test/foo");
+ assertEvent(Type.NODE_REMOVED, "/test/foo");
+ client.create().forPath("/test/foo", "two".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testKilledSession() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/foo", "foo".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/foo");
+ client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/me");
+
+ KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+ assertEvent(Type.CONNECTION_LOST);
+ assertEvent(Type.CONNECTION_RECONNECTED);
+ assertEvent(Type.INITIALIZED);
+ assertEvent(Type.NODE_REMOVED, "/test/me", "data".getBytes());
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testBasics() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/t"));
+ Assert.assertNull(cache.get("/testing"));
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+ Assert.assertEquals(cache.childrenAtPath("/test/one").keySet(), ImmutableSet.of());
+ Assert.assertNull(cache.get("/test/o"));
+ Assert.assertNull(cache.get("/test/onely"));
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ assertEvent(Type.NODE_UPDATED, "/test/one");
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of("one"));
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ assertEvent(Type.NODE_REMOVED, "/test/one", "sup!".getBytes());
+ Assert.assertEquals(cache.childrenAtPath("/test").keySet(), ImmutableSet.of());
+
+ assertNoMoreEvents();
+ }
+
+ @Test
+ public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception
+ {
+ client.create().forPath("/test");
+
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ assertEvent(Type.NODE_ADDED, "/test/one");
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+
+ cache.close();
+ assertNoMoreEvents();
+
+ client.delete().forPath("/test/one");
+ assertNoMoreEvents();
+ }
+
+ /**
+ * Make sure TreeCache gets to a sane state when we can't initially connect to server.
+ */
+ @Test
+ public void testServerNotStartedYet() throws Exception
+ {
+ // Stop the existing server.
+ server.stop();
+
+ // Shutdown the existing client and re-create it started.
+ client.close();
+ initCuratorFramework();
+
+ // Start the client disconnected.
+ cache = newCacheWithListeners(client, "/test");
+ cache.start();
+ assertNoMoreEvents();
+
+ // Now restart the server.
+ server.restart();
+ assertEvent(Type.INITIALIZED);
+
+ client.create().forPath("/test");
+
+ assertEvent(Type.NODE_ADDED, "/test");
+ assertNoMoreEvents();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/02073a71/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
new file mode 100644
index 0000000..03d0999
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheBridgeWrapperRandomTree.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.watch.CachedNode;
+import org.apache.curator.framework.recipes.watch.CuratorCache;
+import org.apache.curator.utils.ZKPaths;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestTreeCacheBridgeWrapperRandomTree extends BaseTestTreeCache<CuratorCache>
+{
+ /**
+ * A randomly generated source-of-truth node for {@link #testGiantRandomDeepTree()}
+ */
+ private static final class TestNode
+ {
+ String fullPath;
+ byte[] data;
+ Map<String, TestNode> children = new HashMap<String, TestNode>();
+
+ TestNode(String fullPath, byte[] data)
+ {
+ this.fullPath = fullPath;
+ this.data = data;
+ }
+ }
+
+ // These constants will produce a tree about 10 levels deep.
+ private static final int ITERATIONS = 1000;
+ private static final double DIVE_CHANCE = 0.9;
+ private static final int TEST_DEPTH = 5;
+
+ private final Random random = new Random();
+ private boolean withDepth = false;
+
+ /**
+ * Randomly construct a large tree of test data in memory, mirror it into ZK, and then use
+ * a TreeCache to follow the changes. At each step, assert that TreeCache matches our
+ * source-of-truth test data, and that we see exactly the set of events we expect to see.
+ */
+
+ @Test
+ public void testGiantRandomDeepTree() throws Exception {
+ client.create().forPath("/tree", null);
+ CuratorFramework cl = client.usingNamespace("tree");
+ cache = newCacheWithListeners(cl, "/");
+ cache.start();
+ assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/");
+ assertEvent(TreeCacheEvent.Type.INITIALIZED);
+
+ TestNode root = new TestNode("/", new byte[0]);
+ int maxDepth = 0;
+ int adds = 0;
+ int removals = 0;
+ int updates = 0;
+
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ // Select a node to update, randomly navigate down through the tree
+ int depth = 0;
+ TestNode last = null;
+ TestNode node = root;
+ while ( !node.children.isEmpty() && random.nextDouble() < DIVE_CHANCE )
+ {
+ // Go down a level in the tree. Select a random child for the next iteration.
+ last = node;
+ node = Iterables.get(node.children.values(), random.nextInt(node.children.size()));
+ ++depth;
+ }
+ maxDepth = Math.max(depth, maxDepth);
+
+ // Okay we found a node, let's do something interesting with it.
+ switch ( random.nextInt(3) )
+ {
+ case 0:
+ // Try a removal if we have no children and we're not the root node.
+ if ( node != root && node.children.isEmpty() )
+ {
+ // Delete myself from parent.
+ TestNode removed = last.children.remove(ZKPaths.getNodeFromPath(node.fullPath));
+ Assert.assertSame(node, removed);
+
+ // Delete from ZK
+ cl.delete().forPath(node.fullPath);
+
+ // TreeCache should see the delete.
+ if (shouldSeeEventAt(node.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_REMOVED, node.fullPath);
+ }
+ ++removals;
+ }
+ break;
+ case 1:
+ // Do an update.
+ byte[] newData = new byte[10];
+ random.nextBytes(newData);
+
+ if ( Arrays.equals(node.data, newData) )
+ {
+ // Randomly generated the same data! Very small chance, just skip.
+ continue;
+ }
+
+ // Update source-of-truth.
+ node.data = newData;
+
+ // Update in ZK.
+ cl.setData().forPath(node.fullPath, node.data);
+
+ // TreeCache should see the update.
+ if (shouldSeeEventAt(node.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_UPDATED, node.fullPath, node.data);
+ }
+
+ ++updates;
+ break;
+ case 2:
+ // Add a new child.
+ String name = Long.toHexString(random.nextLong());
+ if ( node.children.containsKey(name) )
+ {
+ // Randomly generated the same name! Very small chance, just skip.
+ continue;
+ }
+
+ // Add a new child to our test tree.
+ byte[] data = new byte[10];
+ random.nextBytes(data);
+ TestNode child = new TestNode(ZKPaths.makePath(node.fullPath, name), data);
+ node.children.put(name, child);
+
+ // Add to ZK.
+ cl.create().forPath(child.fullPath, child.data);
+
+ // TreeCache should see the add.
+ if (shouldSeeEventAt(child.fullPath))
+ {
+ assertEvent(TreeCacheEvent.Type.NODE_ADDED, child.fullPath, child.data);
+ }
+
+ ++adds;
+ break;
+ }
+
+ // Each iteration, ensure the cached state matches our source-of-truth tree.
+ assertNodeEquals(ListenerBridge.toData("/", cache.get("/")), root);
+ assertTreeEquals(cache, root, 0);
+ }
+
+ // Typical stats for this test: maxDepth: 10, adds: 349, removals: 198, updates: 320
+ // We get more adds than removals because removals only happen if we're at a leaf.
+ System.out.println(String.format("maxDepth: %s, adds: %s, removals: %s, updates: %s", maxDepth, adds, removals, updates));
+ assertNoMoreEvents();
+ }
+
+ /**
+ * Returns true we should see an event at this path based on maxDepth, false otherwise.
+ */
+ private boolean shouldSeeEventAt(String fullPath)
+ {
+ return !withDepth || ZKPaths.split(fullPath).size() <= TEST_DEPTH;
+ }
+
+ /**
+ * Recursively assert that current children equal expected children.
+ */
+ private void assertTreeEquals(CuratorCache cache, TestNode expectedNode, int depth)
+ {
+ String path = expectedNode.fullPath;
+ Map<String, CachedNode> cacheChildren = cache.childrenAtPath(path);
+ Assert.assertNotNull(cacheChildren, path);
+
+ if (withDepth && depth == TEST_DEPTH) {
+ return;
+ }
+
+ Assert.assertEquals(cacheChildren.keySet(), expectedNode.children.keySet(), path);
+
+ for ( Map.Entry<String, TestNode> entry : expectedNode.children.entrySet() )
+ {
+ String nodeName = entry.getKey();
+ CachedNode childData = cacheChildren.get(nodeName);
+ TestNode expectedChild = entry.getValue();
+ assertNodeEquals(ListenerBridge.toData(expectedChild.fullPath, childData), expectedChild);
+ assertTreeEquals(cache, expectedChild, depth + 1);
+ }
+ }
+
+ /**
+ * Assert that the given node data matches expected test node data.
+ */
+ private static void assertNodeEquals(ChildData actualChild, TestNode expectedNode)
+ {
+ String path = expectedNode.fullPath;
+ Assert.assertNotNull(actualChild, path);
+ Assert.assertEquals(actualChild.getData(), expectedNode.data, path);
+ }
+}
[6/8] curator git commit: doc updates
Posted by ra...@apache.org.
doc updates
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9c1186be
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9c1186be
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9c1186be
Branch: refs/heads/persistent-watch
Commit: 9c1186bebbbb5f65f18c942824e85051bddf8c19
Parents: 02073a7
Author: randgalt <ra...@apache.org>
Authored: Sun Aug 27 19:21:16 2017 +0200
Committer: randgalt <ra...@apache.org>
Committed: Sun Aug 27 19:21:16 2017 +0200
----------------------------------------------------------------------
.../apache/curator/framework/recipes/cache/TreeCache.java | 9 ++++++++-
.../curator/framework/recipes/cache/TreeCacheBridge.java | 4 ++--
2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9c1186be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 4e663cd..b411220 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -73,7 +73,9 @@ import static org.apache.curator.utils.PathUtils.validatePath;
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
*
- * @deprecated use {@link CuratorCache}
+ * @deprecated use {@link CuratorCache} - to help in upgrading from old code, you can
+ * use {@link org.apache.curator.framework.recipes.cache.TreeCache.Builder#buildBridge()} to build an instance
+ * that roughly matches the TreeCache API but is backed by a CuratorCache
*/
@Deprecated
public class TreeCache implements TreeCacheBridge
@@ -112,6 +114,11 @@ public class TreeCache implements TreeCacheBridge
return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
}
+ /**
+ * Builds a TreeCacheBridge instance backed by {@link org.apache.curator.framework.recipes.watch.CuratorCache}
+ *
+ * @return instance
+ */
public TreeCacheBridge buildBridge()
{
CuratorCacheBuilder builder = CuratorCacheBuilder.builder(client, path);
http://git-wip-us.apache.org/repos/asf/curator/blob/9c1186be/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
index 9c383af..8b6f37a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCacheBridge.java
@@ -30,10 +30,10 @@ public interface TreeCacheBridge extends Closeable
/**
* Return the current set of children at the given path, mapped by child name. There are no
* guarantees of accuracy; this is merely the most recent view of the data. If there is no
- * node at this path, {@code null} is returned.
+ * node at this path, an empty list or {@code null} is returned (depending on implementation).
*
* @param fullPath full path to the node to check
- * @return a possibly-empty list of children if the node is alive, or null
+ * @return a possibly-empty list of children if the node is alive, or null (depending on implementation)
*/
Map<String, ChildData> getCurrentChildren(String fullPath);