You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/12 14:58:05 UTC

[curator] 03/03: CURATOR-549

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 17ca4a39811efdf1b14b938773bc2d465ae1d049
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 126 +++++++++++++++++
 .../recipes/cache/CuratorCacheBridge.java          |  59 ++++++++
 .../recipes/cache/CuratorCacheBuilder.java         |   8 ++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  19 +++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   9 +-
 .../recipes/cache/CuratorCacheListenerBuilder.java |  22 ++-
 .../cache/CuratorCacheListenerBuilderImpl.java     |  19 ++-
 .../recipes/cache/CuratorCacheStorage.java         |   1 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  11 +-
 .../curator/framework/recipes/cache/TreeCache.java |  36 +++--
 .../framework/recipes/nodes/GroupMember.java       |  43 +++---
 .../recipes/cache/TestCuratorCacheWrappers.java    |   2 +-
 .../framework/recipes/nodes/TestGroupMember.java   |   2 +
 curator-test-zk35/pom.xml                          |  46 ++++++
 curator-x-async/pom.xml                            |  15 ++
 .../details/CachedModeledFrameworkImpl.java        |  11 +-
 .../x/async/modeled/details/ModeledCacheImpl.java  |  77 +++++++----
 .../modeled/details/ModeledFrameworkImpl.java      |   4 +-
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../x/async/modeled/TestModeledFrameworkBase.java  |   4 +-
 curator-x-discovery/pom.xml                        |  15 ++
 .../apache/curator/x/discovery/ServiceCache.java   |  20 ++-
 .../curator/x/discovery/ServiceProvider.java       |  24 +++-
 .../discovery/details/ServiceCacheBuilderImpl.java |   6 +-
 .../x/discovery/details/ServiceCacheImpl.java      | 154 ++++++++++++---------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  65 ++++-----
 .../x/discovery/details/ServiceProviderImpl.java   |   8 ++
 .../curator/x/discovery/TestServiceCache.java      |   2 +
 .../x/discovery/details/TestServiceCacheRace.java  |   2 +
 .../x/discovery/details/TestServiceDiscovery.java  |   2 +
 .../x/discovery/details/TestWatchedInstances.java  |   2 +
 pom.xml                                            |  14 ++
 32 files changed, 632 insertions(+), 198 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..f7c1428
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.stream.Stream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor, boolean cacheData)
+    {
+        Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            builder.setExecutor(executor);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        Map<String, ChildData> currentChildren = cache.getCurrentChildren(fromParent);
+        if ( currentChildren == null )
+        {
+            return Stream.empty();
+        }
+        return currentChildren.values().stream();
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..3e329fc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+/**
+ * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends Closeable
+{
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @param fromParent the parent node - determines the children returned in the stream
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..a249d60 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,12 @@ public interface CuratorCacheBuilder
      * @return new Curator Cache
      */
     CuratorCache build();
+
+    /**
+     * Return a new bridge cache based on the builder methods that have been called.
+     *
+     * @param cacheData if true, keep the data bytes cached. If false, clear them after sending notifications
+     * @return new bridge cache
+     */
+    CuratorCacheBridge buildBridge(boolean cacheData);
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..8d7fa4b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
+        return internalBuild(storage);
+    }
+
+    @Override
+    public CuratorCacheBridge buildBridge(boolean cacheData)
+    {
+        Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge");
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return internalBuild(cacheData ? null : CuratorCacheStorage.bytesNotCached());
+        }
+        Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge");
+        return new CompatibleCuratorCacheBridge(client, path, options, executor, cacheData);
+    }
+
+    private CuratorCacheImpl internalBuild(CuratorCacheStorage storage)
+    {
         return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
     }
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index e6be71c..01faa39 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -40,12 +40,13 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -113,6 +114,12 @@ class CuratorCacheImpl implements CuratorCache
     }
 
     @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return storage.streamImmediateChildren(fromParent);
+    }
+
+    @Override
     public Listenable<CuratorCacheListener> listenable()
     {
         return listenerManager;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
index c57e881..9381546 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.cache;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public interface CuratorCacheListenerBuilder
 {
@@ -88,9 +89,11 @@ public interface CuratorCacheListenerBuilder
      *
      * @param client the curator client
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @param basePath the path used as the root in the cache. Only events with a parent path matching this
+     *                 base path are sent to the listener
+     * @return this
      */
-    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath);
 
     /**
      * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
@@ -101,7 +104,7 @@ public interface CuratorCacheListenerBuilder
      *
      * @param client the curator client
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @return this
      */
     CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
 
@@ -110,17 +113,28 @@ public interface CuratorCacheListenerBuilder
      * with CuratorCache.
      *
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @return this
      */
     CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
 
     /**
      * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
      * i.e. changes that occur as the cache is initializing are not sent to the listener
+     *
+     * @return this
      */
     CuratorCacheListenerBuilder afterInitialized();
 
     /**
+     * Make the built listener so that it is only called for paths that return true when applied
+     * to the given filter.
+     *
+     * @param pathFilter path filter
+     * @return this
+     */
+    CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter);
+
+    /**
      * Build and return a new listener based on the methods that have been previously called
      *
      * @return new listener
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
index 4873868..479ba34 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -20,14 +20,17 @@
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
 {
     private final List<CuratorCacheListener> listeners = new ArrayList<>();
     private boolean afterInitializedOnly = false;
+    private Predicate<String> pathFilter = __ -> true;
 
     @Override
     public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
@@ -106,8 +109,9 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
     }
 
     @Override
-    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath)
     {
+        pathFilter = p -> ZKPaths.getPathAndNode(p).getPath().equals(basePath);
         listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
         return this;
     }
@@ -134,6 +138,13 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
     }
 
     @Override
+    public CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter)
+    {
+        this.pathFilter = (pathFilter != null) ? (p -> (p == null) || pathFilter.test(p)) : (__ -> true);
+        return this;
+    }
+
+    @Override
     public CuratorCacheListener build()
     {
         List<CuratorCacheListener> copy = new ArrayList<>(listeners);
@@ -146,7 +157,11 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
             {
                 if ( isInitialized )
                 {
-                    copy.forEach(l -> l.event(type, oldData, data));
+                    ChildData filterData = (data != null) ? data : oldData;
+                    if ( pathFilter.test(filterData.getPath()) )
+                    {
+                        copy.forEach(l -> l.event(type, oldData, data));
+                    }
                 }
             }
 
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index e809263..30f2b81 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -92,6 +92,7 @@ public interface CuratorCacheStorage
     /**
      * Return a stream over the storage entries that are the immediate children of the given node.
      *
+     * @param fromParent the parent node - determines the children returned in the stream
      * @return stream over entries
      */
     Stream<ChildData> streamImmediateChildren(String fromParent);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
         {
             case NODE_CREATED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
                 break;
             }
 
             case NODE_CHANGED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
                 break;
             }
 
             case NODE_DELETED:
             {
-                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
                 break;
             }
         }
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
     @Override
     public void initialized()
     {
-        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+        sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
     }
 
-    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    private void sendEvent(PathChildrenCacheEvent event)
     {
-        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
         try
         {
             listener.childEvent(client, event);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index e2f3a8b..121e72c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final Executor executor;
     private final boolean createParentNodes;
     private final boolean disableZkWatches;
     private final TreeCacheSelector selector;
@@ -89,6 +91,7 @@ public class TreeCache implements Closeable
         private boolean cacheData = true;
         private boolean dataIsCompressed = false;
         private ExecutorService executorService = null;
+        private Executor executor = null;
         private int maxDepth = Integer.MAX_VALUE;
         private boolean createParentNodes = false;
         private boolean disableZkWatches = false;
@@ -105,12 +108,12 @@ public class TreeCache implements Closeable
          */
         public TreeCache build()
         {
-            ExecutorService executor = executorService;
-            if ( executor == null )
+            ExecutorService localExecutorService = executorService;
+            if ( (localExecutorService == null) && (executor == null) )
             {
-                executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
+                localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory);
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector);
         }
 
         /**
@@ -149,6 +152,15 @@ public class TreeCache implements Closeable
         }
 
         /**
+         * Sets the executor to publish events; a default executor will be created if not specified.
+         */
+        public Builder setExecutor(Executor executor)
+        {
+            this.executor = checkNotNull(executor);
+            return this;
+        }
+
+        /**
          * Sets the maximum depth to explore/watch.  A {@code maxDepth} of {@code 0} will watch only
          * the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the
          * root node and its immediate children (kind of like {@link PathChildrenCache}.
@@ -564,7 +576,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector());
     }
 
     /**
@@ -573,12 +585,14 @@ public class TreeCache implements Closeable
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the TreeCache's background thread
+     * @param executor          executor to use for the TreeCache's background thread
      * @param createParentNodes true to create parent nodes as containers
      * @param disableZkWatches true to disable Zookeeper watches
      * @param selector         the selector to use
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
     {
+        this.executor = executor;
         this.createParentNodes = createParentNodes;
         this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
         this.root = new TreeNode(validatePath(path), null);
@@ -588,7 +602,7 @@ public class TreeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
-        this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
+        this.executorService = executorService;
     }
 
     /**
@@ -623,7 +637,10 @@ public class TreeCache implements Closeable
             client.removeWatchers();
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
-            executorService.shutdown();
+            if ( executorService != null )
+            {
+                executorService.shutdown();
+            }
             try
             {
                 root.wasDeleted();
@@ -857,8 +874,9 @@ public class TreeCache implements Closeable
     {
         if ( treeState.get() != TreeState.CLOSED )
         {
+            Executor localExecutor = (executorService != null) ? executorService : executor;
             LOG.debug("publishEvent: {}", event);
-            executorService.submit(new Runnable()
+            localExecutor.execute(new Runnable()
             {
                 @Override
                 public void run()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..b80c71e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -20,16 +20,19 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import java.io.Closeable;
+import java.util.AbstractMap;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Group membership management. Adds this instance into a group and
@@ -37,8 +40,9 @@ import java.util.Map;
  */
 public class GroupMember implements Closeable
 {
-    private final PersistentEphemeralNode pen;
-    private final PathChildrenCache cache;
+    private final PersistentNode pen;
+    private final CuratorCacheBridge cache;
+    private final String membershipPath;
     private final String thisId;
 
     /**
@@ -59,9 +63,10 @@ public class GroupMember implements Closeable
      */
     public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
     {
+        this.membershipPath = membershipPath;
         this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null");
 
-        cache = newPathChildrenCache(client, membershipPath);
+        cache = newCache(client, membershipPath);
         pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload);
     }
 
@@ -119,19 +124,11 @@ public class GroupMember implements Closeable
      */
     public Map<String, byte[]> getCurrentMembers()
     {
-        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
-        boolean thisIdAdded = false;
-        for ( ChildData data : cache.getCurrentData() )
-        {
-            String id = idFromPath(data.getPath());
-            thisIdAdded = thisIdAdded || id.equals(thisId);
-            builder.put(id, data.getData());
-        }
-        if ( !thisIdAdded )
-        {
-            builder.put(thisId, pen.getData());   // this instance is always a member
-        }
-        return builder.build();
+        Map<String, byte[]> map = new HashMap<>();
+        map.put(thisId, pen.getData());
+        return cache.streamImmediateChildren(membershipPath)
+            .map(data -> new AbstractMap.SimpleEntry<>(idFromPath(data.getPath()), data.getData()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k, v) -> v, () -> map));
     }
 
     /**
@@ -145,13 +142,13 @@ public class GroupMember implements Closeable
         return ZKPaths.getNodeFromPath(path);
     }
 
-    protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
+    protected PersistentNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
     {
-        return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload);
+        return new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload);
     }
 
-    protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath)
+    protected CuratorCacheBridge newCache(CuratorFramework client, String membershipPath)
     {
-        return new PathChildrenCache(client, membershipPath, true);
+        return CuratorCache.builder(client, membershipPath).buildBridge(true);
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
index 4a75acf..edfecb5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -56,7 +56,7 @@ public class TestCuratorCacheWrappers extends CuratorTestBase
                         events.offer(event.getType());
                     }
                 };
-                cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+                cache.listenable().addListener(builder().forPathChildrenCache(client, listener, "/test").build());
                 cache.start();
 
                 client.create().forPath("/test/one", "hey there".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..85a2097 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Map;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestGroupMember extends BaseClassForTests
 {
     // NOTE - don't need many tests as this class is just a wrapper around two existing recipes
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 8d0d220..53f33f7 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -114,6 +126,32 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>
@@ -124,6 +162,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -151,6 +195,8 @@
                     <dependenciesToScan>
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
+                        <dependency>org.apache.curator:curator-x-async</dependency>
+                        <dependency>org.apache.curator:curator-x-discovery</dependency>
                     </dependenciesToScan>
                     <groups>zk35,zk35TestCompatibility</groups>
                     <excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
-    private final Executor executor;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
     {
         this.client = client;
         this.cache = cache;
-        this.executor = executor;
     }
 
     @Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> child(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache);
     }
 
     @Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
     }
 
     @Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..6fe866d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
     private final ZPath basePath;
+    private final CuratorFramework client;
+    private final Set<CreateOption> options;
 
     private static final class Entry<T>
     {
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
     {
+        this.client = client;
         if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
         {
             modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
@@ -69,19 +78,40 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        options = modelSpec.createOptions();
+        CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath());
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        if ( executor != null )
+        {
+            builder.withExecutor(executor);
+        }
+        cache = builder.buildBridge(false);
     }
 
     public void start()
     {
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forTreeCache(client, this)
+            .withPathFilter(path -> !path.equals(basePath.fullPath()))
+            .build();
         try
         {
-            cache.getListenable().addListener(this);
+            if ( options.contains(CreateOption.createParentsIfNeeded) )
+            {
+                if ( options.contains(CreateOption.createParentsAsContainers) )
+                {
+                    new EnsureContainers(client, basePath.fullPath()).ensure();
+                }
+                else
+                {
+                    ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false);
+                }
+            }
+
+            cache.listenable().addListener(listener);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +122,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
@@ -159,16 +188,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_UPDATED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
+            byte[] bytes = event.getData().getData();
+            if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
             {
-                byte[] bytes = event.getData().getData();
-                if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
-                {
-                    T model = serializer.deserialize(bytes);
-                    entries.put(path, new Entry<>(event.getData().getStat(), model));
-                    ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
-                    accept(type, path, event.getData().getStat(), model);
-                }
+                T model = serializer.deserialize(bytes);
+                entries.put(path, new Entry<>(event.getData().getStat(), model));
+                ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+                accept(type, path, event.getData().getStat(), model);
             }
             break;
         }
@@ -176,13 +202,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_REMOVED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
-            {
-                Entry<T> entry = entries.remove(path);
-                T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
-                Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
-                accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
-            }
+            Entry<T> entry = entries.remove(path);
+            T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+            Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+            accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
             break;
         }
 
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index dbbf3cb..799845f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public CachedModeledFramework<T> cached()
     {
-        return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+        return cached(null);
     }
 
     @Override
     public CachedModeledFramework<T> cached(ExecutorService executor)
     {
         Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
-        return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
+        return new CachedModeledFrameworkImpl<>(this, null);
     }
 
     @Override
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
     protected ModelSpec<TestNewerModel> newModelSpec;
     protected AsyncCuratorFramework async;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..270005e 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
-     * The cache must be started before use
+     * The cache must be started before use. This method blocks while the internal
+     * cache is loaded.
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The cache must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index f542ed3..d09885b 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * The main API for Discovery. This class is essentially a facade over a {@link ProviderStrategy}
@@ -31,11 +32,24 @@ import java.util.Collection;
 public interface ServiceProvider<T> extends Closeable
 {
     /**
-     * The provider must be started before use
+     * The provider must be started before use. This method blocks while the internal
+     * cache is loaded.
      *
      * @throws Exception any errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The provider must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 
     /**
      * Return an instance for a single use. <b>IMPORTANT: </b> users
@@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return the instance to use
      * @throws Exception any errors
      */
-    public ServiceInstance<T> getInstance() throws Exception;
+    ServiceInstance<T> getInstance() throws Exception;
 
     /**
      * Return the current available set of instances <b>IMPORTANT: </b> users
@@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return all known instances
      * @throws Exception any errors
      */
-    public Collection<ServiceInstance<T>> getAllInstances() throws Exception;
+    Collection<ServiceInstance<T>> getAllInstances() throws Exception;
 
     /**
      * Take note of an error connecting to the given instance. The instance will potentially
@@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable
      *
      * @param instance instance that had an error
      */
-    public void noteError(ServiceInstance<T> instance);
+    void noteError(ServiceInstance<T> instance);
 
     /**
      * Close the provider. Note: it's the provider's responsibility to close any caches it manages
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..501e289 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        if (executorService != null)
+        if (threadFactory != null)
         {
-            return new ServiceCacheImpl<T>(discovery, name, executorService);
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
         }
         else
         {
-            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
         }
     }
 
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..6ff68f1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final CuratorCacheBridge cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+    private String path;
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -73,12 +78,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(executorService, "executorService cannot be null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        path = discovery.pathForName(name);
+
+        CuratorCacheBuilder builder = CuratorCache.builder(discovery.getClient(), path);
+        if ( executorService != null )
+        {
+            builder.withExecutor(executorService::submit);
+        }
+        cache = builder.buildBridge(false);
+
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forPathChildrenCache(discovery.getClient(), this, path)
+            .forInitialized(this::initialized)
+            .withPathFilter(p -> !p.equals(path))
+            .build();
+        cache.listenable().addListener(listener);
     }
 
     @Override
@@ -94,11 +111,20 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        new EnsureContainers(discovery.getClient(), path).ensure();
+
+        cache.start();
         if ( debugStartLatch != null )
         {
+            initializedLatch.await();
             debugStartLatch.countDown();
             debugStartLatch = null;
         }
@@ -108,14 +134,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
@@ -123,18 +142,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +182,36 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
-            {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            addInstance(event.getData());
+            notifyListeners = true;
+            break;
+        }
 
-            case CHILD_REMOVED:
-            {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_REMOVED:
+        {
+            instances.remove(instanceIdFromData(event.getData()));
+            notifyListeners = true;
+            break;
+        }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -207,18 +220,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
+    private void addInstance(ChildData childData)
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..13ae887 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private static class Entry<T>
     {
         private volatile ServiceInstance<T> service;
-        private volatile NodeCache cache;
+        private volatile CuratorCacheBridge cache;
 
         private Entry(ServiceInstance<T> service)
         {
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        return new ServiceCacheBuilderImpl<T>(this)
-            .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+        return new ServiceCacheBuilderImpl<T>(this);
     }
 
     /**
@@ -458,52 +458,47 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         }
     }
 
-    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+    private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
     {
         if ( !watchInstances )
         {
             return null;
         }
 
-        final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
-        try
-        {
-            nodeCache.start(true);
-        }
-        catch ( InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-        catch ( Exception e )
-        {
-            log.error("Could not start node cache for: " + instance, e);
-        }
-        NodeCacheListener listener = new NodeCacheListener()
-        {
-            @Override
-            public void nodeChanged() throws Exception
-            {
-                if ( nodeCache.getCurrentData() != null )
+        CuratorCacheBridge cache = CuratorCache.builder(client, pathForInstance(instance.getName(), instance.getId()))
+            .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+            .buildBridge(false);
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .afterInitialized()
+            .forAll((__, ___, data) -> {
+                if ( data != null )
                 {
-                    ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
-                    Entry<T> entry = services.get(newInstance.getId());
-                    if ( entry != null )
+                    try
                     {
-                        synchronized(entry)
+                        ServiceInstance<T> newInstance = serializer.deserialize(data.getData());
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            entry.service = newInstance;
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
+                    catch ( Exception e )
+                    {
+                        log.debug("Could not deserialize: " + data.getPath());
+                    }
                 }
                 else
                 {
                     log.warn("Instance data has been deleted for: " + instance);
                 }
-            }
-        };
-        nodeCache.getListenable().addListener(listener);
-        return nodeCache;
+            })
+            .build();
+        cache.listenable().addListener(listener);
+        cache.start();
+        return cache;
     }
 
     private void internalUnregisterService(final Entry<T> entry) throws Exception
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..a411f33 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T>
         discovery.providerOpened(this);
     }
 
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
+        discovery.providerOpened(this);
+        return cache.startImmediate();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..e9a7956 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCacheRace extends BaseClassForTests
 {
     private final Timing timing = new Timing();
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..d86e193 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceDiscovery extends BaseClassForTests
 {
     private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
index 2d03c47..a96ff6f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -35,6 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestWatchedInstances extends BaseClassForTests
 {
     @Test
diff --git a/pom.xml b/pom.xml
index 2556c6b..19417e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-discovery</artifactId>
+                <version>${project.version}</version>
             </dependency>
 
             <dependency>
@@ -400,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-async</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-math</artifactId>
                 <version>${commons-math-version}</version>