You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/12 14:58:05 UTC
[curator] 03/03: CURATOR-549
This is an automated email from the ASF dual-hosted git repository.
randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 17ca4a39811efdf1b14b938773bc2d465ae1d049
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500
CURATOR-549
Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
---
.../cache/CompatibleCuratorCacheBridge.java | 126 +++++++++++++++++
.../recipes/cache/CuratorCacheBridge.java | 59 ++++++++
.../recipes/cache/CuratorCacheBuilder.java | 8 ++
.../recipes/cache/CuratorCacheBuilderImpl.java | 19 +++
.../framework/recipes/cache/CuratorCacheImpl.java | 9 +-
.../recipes/cache/CuratorCacheListenerBuilder.java | 22 ++-
.../cache/CuratorCacheListenerBuilderImpl.java | 19 ++-
.../recipes/cache/CuratorCacheStorage.java | 1 +
.../cache/PathChildrenCacheListenerWrapper.java | 11 +-
.../curator/framework/recipes/cache/TreeCache.java | 36 +++--
.../framework/recipes/nodes/GroupMember.java | 43 +++---
.../recipes/cache/TestCuratorCacheWrappers.java | 2 +-
.../framework/recipes/nodes/TestGroupMember.java | 2 +
curator-test-zk35/pom.xml | 46 ++++++
curator-x-async/pom.xml | 15 ++
.../details/CachedModeledFrameworkImpl.java | 11 +-
.../x/async/modeled/details/ModeledCacheImpl.java | 77 +++++++----
.../modeled/details/ModeledFrameworkImpl.java | 4 +-
.../async/modeled/TestCachedModeledFramework.java | 2 +
.../x/async/modeled/TestModeledFrameworkBase.java | 4 +-
curator-x-discovery/pom.xml | 15 ++
.../apache/curator/x/discovery/ServiceCache.java | 20 ++-
.../curator/x/discovery/ServiceProvider.java | 24 +++-
.../discovery/details/ServiceCacheBuilderImpl.java | 6 +-
.../x/discovery/details/ServiceCacheImpl.java | 154 ++++++++++++---------
.../x/discovery/details/ServiceDiscoveryImpl.java | 65 ++++-----
.../x/discovery/details/ServiceProviderImpl.java | 8 ++
.../curator/x/discovery/TestServiceCache.java | 2 +
.../x/discovery/details/TestServiceCacheRace.java | 2 +
.../x/discovery/details/TestServiceDiscovery.java | 2 +
.../x/discovery/details/TestWatchedInstances.java | 2 +
pom.xml | 14 ++
32 files changed, 632 insertions(+), 198 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..f7c1428
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.stream.Stream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
+{
+ private final TreeCache cache;
+ private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+ CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor, boolean cacheData)
+ {
+ Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+ TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData);
+ if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+ {
+ builder.setMaxDepth(0);
+ }
+ if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+ {
+ builder.setDataIsCompressed(true);
+ }
+ if ( executor != null )
+ {
+ builder.setExecutor(executor);
+ }
+ cache = builder.build();
+ }
+
+ @Override
+ public void start()
+ {
+ try
+ {
+ cache.getListenable().addListener(this);
+
+ cache.start();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ cache.close();
+ }
+
+ @Override
+ public Listenable<CuratorCacheListener> listenable()
+ {
+ return listenerManager;
+ }
+
+ @Override
+ public Stream<ChildData> streamImmediateChildren(String fromParent)
+ {
+ Map<String, ChildData> currentChildren = cache.getCurrentChildren(fromParent);
+ if ( currentChildren == null )
+ {
+ return Stream.empty();
+ }
+ return currentChildren.values().stream();
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+ {
+ switch ( event.getType() )
+ {
+ case NODE_ADDED:
+ {
+ listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
+ break;
+ }
+
+ case NODE_REMOVED:
+ {
+ listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
+ break;
+ }
+
+ case NODE_UPDATED:
+ {
+ listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData()));
+ break;
+ }
+
+ case INITIALIZED:
+ {
+ listenerManager.forEach(CuratorCacheListener::initialized);
+ break;
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..3e329fc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+/**
+ * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends Closeable
+{
+ /**
+ * Start the cache. This will cause a complete refresh from the cache's root node and generate
+ * events for all nodes found, etc.
+ */
+ void start();
+
+ /**
+ * Close the cache, stop responding to events, etc.
+ */
+ @Override
+ void close();
+
+ /**
+ * Return the listener container so that listeners can be registered to be notified of changes to the cache
+ *
+ * @return listener container
+ */
+ Listenable<CuratorCacheListener> listenable();
+
+ /**
+ * Return a stream over the storage entries that are the immediate children of the given node.
+ *
+ * @param fromParent the parent node - determines the children returned in the stream
+ * @return stream over entries
+ */
+ Stream<ChildData> streamImmediateChildren(String fromParent);
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..a249d60 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,12 @@ public interface CuratorCacheBuilder
* @return new Curator Cache
*/
CuratorCache build();
+
+ /**
+ * Return a new bridge cache based on the builder methods that have been called.
+ *
+ * @param cacheData if true, keep the data bytes cached. If false, clear them after sending notifications
+ * @return new bridge cache
+ */
+ CuratorCacheBridge buildBridge(boolean cacheData);
}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..8d7fa4b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
package org.apache.curator.framework.recipes.cache;
+import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
@Override
public CuratorCache build()
{
+ return internalBuild(storage);
+ }
+
+ @Override
+ public CuratorCacheBridge buildBridge(boolean cacheData)
+ {
+ Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge");
+ if ( Compatibility.hasPersistentWatchers() )
+ {
+ return internalBuild(cacheData ? null : CuratorCacheStorage.bytesNotCached());
+ }
+ Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge");
+ return new CompatibleCuratorCacheBridge(client, path, options, executor, cacheData);
+ }
+
+ private CuratorCacheImpl internalBuild(CuratorCacheStorage storage)
+ {
return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
}
}
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index e6be71c..01faa39 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -40,12 +40,13 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Stream;
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
import static org.apache.zookeeper.KeeperException.Code.NONODE;
import static org.apache.zookeeper.KeeperException.Code.OK;
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -113,6 +114,12 @@ class CuratorCacheImpl implements CuratorCache
}
@Override
+ public Stream<ChildData> streamImmediateChildren(String fromParent)
+ {
+ return storage.streamImmediateChildren(fromParent);
+ }
+
+ @Override
public Listenable<CuratorCacheListener> listenable()
{
return listenerManager;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
index c57e881..9381546 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
import java.util.function.Consumer;
+import java.util.function.Predicate;
public interface CuratorCacheListenerBuilder
{
@@ -88,9 +89,11 @@ public interface CuratorCacheListenerBuilder
*
* @param client the curator client
* @param listener the listener to wrap
- * @return a CuratorCacheListener that forwards to the given listener
+ * @param basePath the path used as the root in the cache. Only events with a parent path matching this
+ * base path are sent to the listener
+ * @return this
*/
- CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+ CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath);
/**
* Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
@@ -101,7 +104,7 @@ public interface CuratorCacheListenerBuilder
*
* @param client the curator client
* @param listener the listener to wrap
- * @return a CuratorCacheListener that forwards to the given listener
+ * @return this
*/
CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
@@ -110,17 +113,28 @@ public interface CuratorCacheListenerBuilder
* with CuratorCache.
*
* @param listener the listener to wrap
- * @return a CuratorCacheListener that forwards to the given listener
+ * @return this
*/
CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
/**
* Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
* i.e. changes that occur as the cache is initializing are not sent to the listener
+ *
+ * @return this
*/
CuratorCacheListenerBuilder afterInitialized();
/**
+ * Make the built listener so that it is only called for paths that return true when applied
+ * to the given filter.
+ *
+ * @param pathFilter path filter
+ * @return this
+ */
+ CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter);
+
+ /**
* Build and return a new listener based on the methods that have been previously called
*
* @return new listener
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
index 4873868..479ba34 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -20,14 +20,17 @@
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
+import java.util.function.Predicate;
class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
{
private final List<CuratorCacheListener> listeners = new ArrayList<>();
private boolean afterInitializedOnly = false;
+ private Predicate<String> pathFilter = __ -> true;
@Override
public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
@@ -106,8 +109,9 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
}
@Override
- public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+ public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath)
{
+ pathFilter = p -> ZKPaths.getPathAndNode(p).getPath().equals(basePath);
listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
return this;
}
@@ -134,6 +138,13 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
}
@Override
+ public CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter)
+ {
+ this.pathFilter = (pathFilter != null) ? (p -> (p == null) || pathFilter.test(p)) : (__ -> true);
+ return this;
+ }
+
+ @Override
public CuratorCacheListener build()
{
List<CuratorCacheListener> copy = new ArrayList<>(listeners);
@@ -146,7 +157,11 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
{
if ( isInitialized )
{
- copy.forEach(l -> l.event(type, oldData, data));
+ ChildData filterData = (data != null) ? data : oldData;
+ if ( pathFilter.test(filterData.getPath()) )
+ {
+ copy.forEach(l -> l.event(type, oldData, data));
+ }
}
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index e809263..30f2b81 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -92,6 +92,7 @@ public interface CuratorCacheStorage
/**
* Return a stream over the storage entries that are the immediate children of the given node.
*
+ * @param fromParent the parent node - determines the children returned in the stream
* @return stream over entries
*/
Stream<ChildData> streamImmediateChildren(String fromParent);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
{
case NODE_CREATED:
{
- sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
break;
}
case NODE_CHANGED:
{
- sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
break;
}
case NODE_DELETED:
{
- sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
break;
}
}
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
@Override
public void initialized()
{
- sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+ sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
}
- private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+ private void sendEvent(PathChildrenCacheEvent event)
{
- PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
try
{
listener.childEvent(client, event);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index e2f3a8b..121e72c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
public class TreeCache implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+ private final Executor executor;
private final boolean createParentNodes;
private final boolean disableZkWatches;
private final TreeCacheSelector selector;
@@ -89,6 +91,7 @@ public class TreeCache implements Closeable
private boolean cacheData = true;
private boolean dataIsCompressed = false;
private ExecutorService executorService = null;
+ private Executor executor = null;
private int maxDepth = Integer.MAX_VALUE;
private boolean createParentNodes = false;
private boolean disableZkWatches = false;
@@ -105,12 +108,12 @@ public class TreeCache implements Closeable
*/
public TreeCache build()
{
- ExecutorService executor = executorService;
- if ( executor == null )
+ ExecutorService localExecutorService = executorService;
+ if ( (localExecutorService == null) && (executor == null) )
{
- executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
+ localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
- return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
+ return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector);
}
/**
@@ -149,6 +152,15 @@ public class TreeCache implements Closeable
}
/**
+ * Sets the executor to publish events; a default executor will be created if not specified.
+ */
+ public Builder setExecutor(Executor executor)
+ {
+ this.executor = checkNotNull(executor);
+ return this;
+ }
+
+ /**
* Sets the maximum depth to explore/watch. A {@code maxDepth} of {@code 0} will watch only
* the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the
* root node and its immediate children (kind of like {@link PathChildrenCache}.
@@ -564,7 +576,7 @@ public class TreeCache implements Closeable
*/
public TreeCache(CuratorFramework client, String path)
{
- this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
+ this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector());
}
/**
@@ -573,12 +585,14 @@ public class TreeCache implements Closeable
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the TreeCache's background thread
+ * @param executor executor to use for the TreeCache's background thread
* @param createParentNodes true to create parent nodes as containers
* @param disableZkWatches true to disable Zookeeper watches
* @param selector the selector to use
*/
- TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+ TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
{
+ this.executor = executor;
this.createParentNodes = createParentNodes;
this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
this.root = new TreeNode(validatePath(path), null);
@@ -588,7 +602,7 @@ public class TreeCache implements Closeable
this.dataIsCompressed = dataIsCompressed;
this.maxDepth = maxDepth;
this.disableZkWatches = disableZkWatches;
- this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
+ this.executorService = executorService;
}
/**
@@ -623,7 +637,10 @@ public class TreeCache implements Closeable
client.removeWatchers();
client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
- executorService.shutdown();
+ if ( executorService != null )
+ {
+ executorService.shutdown();
+ }
try
{
root.wasDeleted();
@@ -857,8 +874,9 @@ public class TreeCache implements Closeable
{
if ( treeState.get() != TreeState.CLOSED )
{
+ Executor localExecutor = (executorService != null) ? executorService : executor;
LOG.debug("publishEvent: {}", event);
- executorService.submit(new Runnable()
+ localExecutor.execute(new Runnable()
{
@Override
public void run()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..b80c71e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -20,16 +20,19 @@ package org.apache.curator.framework.recipes.nodes;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
import java.io.Closeable;
+import java.util.AbstractMap;
+import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Group membership management. Adds this instance into a group and
@@ -37,8 +40,9 @@ import java.util.Map;
*/
public class GroupMember implements Closeable
{
- private final PersistentEphemeralNode pen;
- private final PathChildrenCache cache;
+ private final PersistentNode pen;
+ private final CuratorCacheBridge cache;
+ private final String membershipPath;
private final String thisId;
/**
@@ -59,9 +63,10 @@ public class GroupMember implements Closeable
*/
public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
{
+ this.membershipPath = membershipPath;
this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null");
- cache = newPathChildrenCache(client, membershipPath);
+ cache = newCache(client, membershipPath);
pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload);
}
@@ -119,19 +124,11 @@ public class GroupMember implements Closeable
*/
public Map<String, byte[]> getCurrentMembers()
{
- ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
- boolean thisIdAdded = false;
- for ( ChildData data : cache.getCurrentData() )
- {
- String id = idFromPath(data.getPath());
- thisIdAdded = thisIdAdded || id.equals(thisId);
- builder.put(id, data.getData());
- }
- if ( !thisIdAdded )
- {
- builder.put(thisId, pen.getData()); // this instance is always a member
- }
- return builder.build();
+ Map<String, byte[]> map = new HashMap<>();
+ map.put(thisId, pen.getData());
+ return cache.streamImmediateChildren(membershipPath)
+ .map(data -> new AbstractMap.SimpleEntry<>(idFromPath(data.getPath()), data.getData()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k, v) -> v, () -> map));
}
/**
@@ -145,13 +142,13 @@ public class GroupMember implements Closeable
return ZKPaths.getNodeFromPath(path);
}
- protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
+ protected PersistentNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
{
- return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload);
+ return new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload);
}
- protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath)
+ protected CuratorCacheBridge newCache(CuratorFramework client, String membershipPath)
{
- return new PathChildrenCache(client, membershipPath, true);
+ return CuratorCache.builder(client, membershipPath).buildBridge(true);
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
index 4a75acf..edfecb5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -56,7 +56,7 @@ public class TestCuratorCacheWrappers extends CuratorTestBase
events.offer(event.getType());
}
};
- cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+ cache.listenable().addListener(builder().forPathChildrenCache(client, listener, "/test").build());
cache.start();
client.create().forPath("/test/one", "hey there".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..85a2097 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Map;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestGroupMember extends BaseClassForTests
{
// NOTE - don't need many tests as this class is just a wrapper around two existing recipes
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 8d0d220..53f33f7 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
<dependency>
<groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<exclusions>
<exclusion>
@@ -114,6 +126,32 @@
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-async</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
@@ -124,6 +162,12 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -151,6 +195,8 @@
<dependenciesToScan>
<dependency>org.apache.curator:curator-framework</dependency>
<dependency>org.apache.curator:curator-recipes</dependency>
+ <dependency>org.apache.curator:curator-x-async</dependency>
+ <dependency>org.apache.curator:curator-x-discovery</dependency>
</dependenciesToScan>
<groups>zk35,zk35TestCompatibility</groups>
<excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
{
private final ModeledFramework<T> client;
private final ModeledCacheImpl<T> cache;
- private final Executor executor;
CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
{
- this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+ this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
}
- private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+ private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
{
this.client = client;
this.cache = cache;
- this.executor = executor;
}
@Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> child(Object child)
{
- return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+ return new CachedModeledFrameworkImpl<>(client.child(child), cache);
}
@Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
@Override
public CachedModeledFramework<T> withPath(ZPath path)
{
- return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+ return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
}
@Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..6fe866d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
package org.apache.curator.x.async.modeled.details;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.ModeledCache;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
import org.apache.zookeeper.data.Stat;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
{
- private final TreeCache cache;
+ private final CuratorCacheBridge cache;
private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
private final ModelSerializer<T> serializer;
private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
private final ZPath basePath;
+ private final CuratorFramework client;
+ private final Set<CreateOption> options;
private static final class Entry<T>
{
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
{
+ this.client = client;
if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
{
modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
@@ -69,19 +78,40 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
basePath = modelSpec.path();
this.serializer = modelSpec.serializer();
- cache = TreeCache.newBuilder(client, basePath.fullPath())
- .setCacheData(false)
- .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
- .setExecutor(executor)
- .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
- .build();
+ options = modelSpec.createOptions();
+ CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath());
+ if ( modelSpec.createOptions().contains(CreateOption.compress) )
+ {
+ builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+ }
+ if ( executor != null )
+ {
+ builder.withExecutor(executor);
+ }
+ cache = builder.buildBridge(false);
}
public void start()
{
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .forTreeCache(client, this)
+ .withPathFilter(path -> !path.equals(basePath.fullPath()))
+ .build();
try
{
- cache.getListenable().addListener(this);
+ if ( options.contains(CreateOption.createParentsIfNeeded) )
+ {
+ if ( options.contains(CreateOption.createParentsAsContainers) )
+ {
+ new EnsureContainers(client, basePath.fullPath()).ensure();
+ }
+ else
+ {
+ ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false);
+ }
+ }
+
+ cache.listenable().addListener(listener);
cache.start();
}
catch ( Exception e )
@@ -92,7 +122,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
public void close()
{
- cache.getListenable().removeListener(this);
cache.close();
entries.clear();
}
@@ -159,16 +188,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
case NODE_UPDATED:
{
ZPath path = ZPath.parse(event.getData().getPath());
- if ( !path.equals(basePath) )
+ byte[] bytes = event.getData().getData();
+ if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created
{
- byte[] bytes = event.getData().getData();
- if ( (bytes != null) && (bytes.length > 0) ) // otherwise it's probably just a parent node being created
- {
- T model = serializer.deserialize(bytes);
- entries.put(path, new Entry<>(event.getData().getStat(), model));
- ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
- accept(type, path, event.getData().getStat(), model);
- }
+ T model = serializer.deserialize(bytes);
+ entries.put(path, new Entry<>(event.getData().getStat(), model));
+ ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+ accept(type, path, event.getData().getStat(), model);
}
break;
}
@@ -176,13 +202,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
case NODE_REMOVED:
{
ZPath path = ZPath.parse(event.getData().getPath());
- if ( !path.equals(basePath) )
- {
- Entry<T> entry = entries.remove(path);
- T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
- Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
- accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
- }
+ Entry<T> entry = entries.remove(path);
+ T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+ Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+ accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
break;
}
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index dbbf3cb..799845f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
@Override
public CachedModeledFramework<T> cached()
{
- return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+ return cached(null);
}
@Override
public CachedModeledFramework<T> cached(ExecutorService executor)
{
Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
- return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
+ return new CachedModeledFrameworkImpl<>(this, null);
}
@Override
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
import com.google.common.collect.Sets;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestCachedModeledFramework extends TestModeledFrameworkBase
{
@Test
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
protected ModelSpec<TestNewerModel> newModelSpec;
protected AsyncCuratorFramework async;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
@Override
public void setup() throws Exception
{
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
newModelSpec = ModelSpec.builder(path, newSerializer).build();
}
- @AfterMethod
+ @AfterMethod(alwaysRun = true)
@Override
public void teardown() throws Exception
{
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..270005e 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import java.io.Closeable;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
{
@@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe
*
* @return the list
*/
- public List<ServiceInstance<T>> getInstances();
+ List<ServiceInstance<T>> getInstances();
/**
- * The cache must be started before use
+ * The cache must be started before use. This method blocks while the internal
+ * cache is loaded.
*
* @throws Exception errors
*/
- public void start() throws Exception;
+ void start() throws Exception;
+
+ /**
+ * The cache must be started before use. This version returns immediately.
+ * Use the returned latch to block until the cache is loaded
+ *
+ * @return a latch that can be used to block until the cache is loaded
+ * @throws Exception errors
+ */
+ default CountDownLatch startImmediate() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index f542ed3..d09885b 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
/**
* The main API for Discovery. This class is essentially a facade over a {@link ProviderStrategy}
@@ -31,11 +32,24 @@ import java.util.Collection;
public interface ServiceProvider<T> extends Closeable
{
/**
- * The provider must be started before use
+ * The provider must be started before use. This method blocks while the internal
+ * cache is loaded.
*
* @throws Exception any errors
*/
- public void start() throws Exception;
+ void start() throws Exception;
+
+ /**
+ * The provider must be started before use. This version returns immediately.
+ * Use the returned latch to block until the cache is loaded
+ *
+ * @return a latch that can be used to block until the cache is loaded
+ * @throws Exception errors
+ */
+ default CountDownLatch startImmediate() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
/**
* Return an instance for a single use. <b>IMPORTANT: </b> users
@@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable
* @return the instance to use
* @throws Exception any errors
*/
- public ServiceInstance<T> getInstance() throws Exception;
+ ServiceInstance<T> getInstance() throws Exception;
/**
* Return the current available set of instances <b>IMPORTANT: </b> users
@@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable
* @return all known instances
* @throws Exception any errors
*/
- public Collection<ServiceInstance<T>> getAllInstances() throws Exception;
+ Collection<ServiceInstance<T>> getAllInstances() throws Exception;
/**
* Take note of an error connecting to the given instance. The instance will potentially
@@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable
*
* @param instance instance that had an error
*/
- public void noteError(ServiceInstance<T> instance);
+ void noteError(ServiceInstance<T> instance);
/**
* Close the provider. Note: it's the provider's responsibility to close any caches it manages
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..501e289 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
@Override
public ServiceCache<T> build()
{
- if (executorService != null)
+ if (threadFactory != null)
{
- return new ServiceCacheImpl<T>(discovery, name, executorService);
+ return new ServiceCacheImpl<T>(discovery, name, threadFactory);
}
else
{
- return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+ return new ServiceCacheImpl<T>(discovery, name, executorService);
}
}
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..6ff68f1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,18 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference;
public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
{
- private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
- private final ServiceDiscoveryImpl<T> discovery;
- private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
- private final PathChildrenCache cache;
- private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+ private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
+ private final ServiceDiscoveryImpl<T> discovery;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final CuratorCacheBridge cache;
+ private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+ private final CountDownLatch initializedLatch = new CountDownLatch(1);
+ private String path;
private enum State
{
- LATENT,
- STARTED,
- STOPPED
+ LATENT, STARTED, STOPPED
}
private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -73,12 +78,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
{
Preconditions.checkNotNull(discovery, "discovery cannot be null");
Preconditions.checkNotNull(name, "name cannot be null");
- Preconditions.checkNotNull(executorService, "executorService cannot be null");
this.discovery = discovery;
- cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
- cache.getListenable().addListener(this);
+ path = discovery.pathForName(name);
+
+ CuratorCacheBuilder builder = CuratorCache.builder(discovery.getClient(), path);
+ if ( executorService != null )
+ {
+ builder.withExecutor(executorService::submit);
+ }
+ cache = builder.buildBridge(false);
+
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .forPathChildrenCache(discovery.getClient(), this, path)
+ .forInitialized(this::initialized)
+ .withPathFilter(p -> !p.equals(path))
+ .build();
+ cache.listenable().addListener(listener);
}
@Override
@@ -94,11 +111,20 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
@Override
public void start() throws Exception
{
+ startImmediate().await();
+ }
+
+ @Override
+ public CountDownLatch startImmediate() throws Exception
+ {
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- cache.start(true);
+ new EnsureContainers(discovery.getClient(), path).ensure();
+
+ cache.start();
if ( debugStartLatch != null )
{
+ initializedLatch.await();
debugStartLatch.countDown();
debugStartLatch = null;
}
@@ -108,14 +134,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
debugStartWaitLatch = null;
}
- for ( ChildData childData : cache.getCurrentData() )
- {
- if ( childData.getData() != null ) // else already processed by the cache listener
- {
- addInstance(childData, true);
- }
- }
- discovery.cacheOpened(this);
+ return initializedLatch;
}
@Override
@@ -123,18 +142,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
- listenerContainer.forEach
- (
- new Function<ServiceCacheListener, Void>()
- {
- @Override
- public Void apply(ServiceCacheListener listener)
- {
- discovery.getClient().getConnectionStateListenable().removeListener(listener);
- return null;
- }
- }
- );
+ listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+ {
+ @Override
+ public Void apply(ServiceCacheListener listener)
+ {
+ discovery.getClient().getConnectionStateListenable().removeListener(listener);
+ return null;
+ }
+ });
listenerContainer.clear();
CloseableUtils.closeQuietly(cache);
@@ -166,39 +182,36 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- boolean notifyListeners = false;
+ boolean notifyListeners = false;
switch ( event.getType() )
{
- case CHILD_ADDED:
- case CHILD_UPDATED:
- {
- addInstance(event.getData(), false);
- notifyListeners = true;
- break;
- }
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ {
+ addInstance(event.getData());
+ notifyListeners = true;
+ break;
+ }
- case CHILD_REMOVED:
- {
- instances.remove(instanceIdFromData(event.getData()));
- notifyListeners = true;
- break;
- }
+ case CHILD_REMOVED:
+ {
+ instances.remove(instanceIdFromData(event.getData()));
+ notifyListeners = true;
+ break;
+ }
}
- if ( notifyListeners )
+ if ( notifyListeners && (initializedLatch.getCount() == 0) )
{
- listenerContainer.forEach
- (
- new Function<ServiceCacheListener, Void>()
+ listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+ {
+ @Override
+ public Void apply(ServiceCacheListener listener)
{
- @Override
- public Void apply(ServiceCacheListener listener)
- {
- listener.cacheChanged();
- return null;
- }
+ listener.cacheChanged();
+ return null;
}
- );
+ });
}
}
@@ -207,18 +220,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
return ZKPaths.getNodeFromPath(childData.getPath());
}
- private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
+ private void addInstance(ChildData childData)
{
- String instanceId = instanceIdFromData(childData);
- ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
- if ( onlyIfAbsent )
+ try
{
- instances.putIfAbsent(instanceId, serviceInstance);
+ String instanceId = instanceIdFromData(childData);
+ ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+ instances.put(instanceId, serviceInstance);
}
- else
+ catch ( Exception e )
{
- instances.put(instanceId, serviceInstance);
+ throw new RuntimeException(e);
}
- cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
+ }
+
+ private void initialized()
+ {
+ discovery.cacheOpened(this);
+ initializedLatch.countDown();
}
}
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..13ae887 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
private static class Entry<T>
{
private volatile ServiceInstance<T> service;
- private volatile NodeCache cache;
+ private volatile CuratorCacheBridge cache;
private Entry(ServiceInstance<T> service)
{
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
- return new ServiceCacheBuilderImpl<T>(this)
- .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+ return new ServiceCacheBuilderImpl<T>(this);
}
/**
@@ -458,52 +458,47 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
}
}
- private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+ private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
{
if ( !watchInstances )
{
return null;
}
- final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
- try
- {
- nodeCache.start(true);
- }
- catch ( InterruptedException e)
- {
- Thread.currentThread().interrupt();
- return null;
- }
- catch ( Exception e )
- {
- log.error("Could not start node cache for: " + instance, e);
- }
- NodeCacheListener listener = new NodeCacheListener()
- {
- @Override
- public void nodeChanged() throws Exception
- {
- if ( nodeCache.getCurrentData() != null )
+ CuratorCacheBridge cache = CuratorCache.builder(client, pathForInstance(instance.getName(), instance.getId()))
+ .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+ .buildBridge(false);
+ CuratorCacheListener listener = CuratorCacheListener.builder()
+ .afterInitialized()
+ .forAll((__, ___, data) -> {
+ if ( data != null )
{
- ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
- Entry<T> entry = services.get(newInstance.getId());
- if ( entry != null )
+ try
{
- synchronized(entry)
+ ServiceInstance<T> newInstance = serializer.deserialize(data.getData());
+ Entry<T> entry = services.get(newInstance.getId());
+ if ( entry != null )
{
- entry.service = newInstance;
+ synchronized(entry)
+ {
+ entry.service = newInstance;
+ }
}
}
+ catch ( Exception e )
+ {
+ log.debug("Could not deserialize: " + data.getPath());
+ }
}
else
{
log.warn("Instance data has been deleted for: " + instance);
}
- }
- };
- nodeCache.getListenable().addListener(listener);
- return nodeCache;
+ })
+ .build();
+ cache.listenable().addListener(listener);
+ cache.start();
+ return cache;
}
private void internalUnregisterService(final Entry<T> entry) throws Exception
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..a411f33 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
/**
@@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T>
discovery.providerOpened(this);
}
+ @Override
+ public CountDownLatch startImmediate() throws Exception
+ {
+ discovery.providerOpened(this);
+ return cache.startImmediate();
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestServiceCache extends BaseClassForTests
{
@Test
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..e9a7956 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestServiceCacheRace extends BaseClassForTests
{
private final Timing timing = new Timing();
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..d86e193 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.x.discovery.ServiceDiscovery;
@@ -39,6 +40,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestServiceDiscovery extends BaseClassForTests
{
private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
index 2d03c47..a96ff6f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -35,6 +36,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
public class TestWatchedInstances extends BaseClassForTests
{
@Test
diff --git a/pom.xml b/pom.xml
index 2556c6b..19417e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
@@ -400,6 +407,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-async</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>${commons-math-version}</version>