You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2019/11/08 23:38:45 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-bridge updated (65be194 -> a34beef)

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a change to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git.


 discard 65be194  CURATOR-549
 discard 01706e0  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
     add 3d0f55b  CURATOR-549 - optimization: if a node that the cache knows about gets a change event, there's no need to query for children if the Cversion hasn't changed
     new a34beef  CURATOR-549

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (65be194)
            \
             N -- N -- N   refs/heads/CURATOR-549-zk36-persistent-watcher-bridge (a34beef)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[curator] 01/01: CURATOR-549

Posted by ra...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-bridge
in repository https://gitbox.apache.org/repos/asf/curator.git

commit a34beef443ad0fc595cb1e693086696a5d963f48
Author: randgalt <ra...@apache.org>
AuthorDate: Sun Nov 3 17:53:02 2019 -0500

    CURATOR-549
    
    Creates a simple bridge that, when using ZK 3.6.0 creates a CuratorCache, and for earlier versions creates a TreeCache. The curator-test-zk35 module ensures that both code paths are tested.
---
 .../cache/CompatibleCuratorCacheBridge.java        | 126 +++++++++++++++++
 .../recipes/cache/CuratorCacheBridge.java          |  59 ++++++++
 .../recipes/cache/CuratorCacheBuilder.java         |   8 ++
 .../recipes/cache/CuratorCacheBuilderImpl.java     |  19 +++
 .../framework/recipes/cache/CuratorCacheImpl.java  |   9 +-
 .../recipes/cache/CuratorCacheListenerBuilder.java |  22 ++-
 .../cache/CuratorCacheListenerBuilderImpl.java     |  19 ++-
 .../recipes/cache/CuratorCacheStorage.java         |   1 +
 .../cache/PathChildrenCacheListenerWrapper.java    |  11 +-
 .../curator/framework/recipes/cache/TreeCache.java |  36 +++--
 .../framework/recipes/nodes/GroupMember.java       |  43 +++---
 .../recipes/cache/TestCuratorCacheWrappers.java    |   2 +-
 .../framework/recipes/nodes/TestGroupMember.java   |   2 +
 curator-test-zk35/pom.xml                          |  46 ++++++
 curator-x-async/pom.xml                            |  15 ++
 .../details/CachedModeledFrameworkImpl.java        |  11 +-
 .../x/async/modeled/details/ModeledCacheImpl.java  |  77 +++++++----
 .../modeled/details/ModeledFrameworkImpl.java      |   4 +-
 .../async/modeled/TestCachedModeledFramework.java  |   2 +
 .../x/async/modeled/TestModeledFrameworkBase.java  |   4 +-
 curator-x-discovery/pom.xml                        |  15 ++
 .../apache/curator/x/discovery/ServiceCache.java   |  20 ++-
 .../curator/x/discovery/ServiceProvider.java       |  24 +++-
 .../discovery/details/ServiceCacheBuilderImpl.java |   6 +-
 .../x/discovery/details/ServiceCacheImpl.java      | 154 ++++++++++++---------
 .../x/discovery/details/ServiceDiscoveryImpl.java  |  65 ++++-----
 .../x/discovery/details/ServiceProviderImpl.java   |   8 ++
 .../curator/x/discovery/TestServiceCache.java      |   2 +
 .../x/discovery/details/TestServiceCacheRace.java  |   2 +
 .../x/discovery/details/TestServiceDiscovery.java  |   2 +
 .../x/discovery/details/TestWatchedInstances.java  |   2 +
 pom.xml                                            |  14 ++
 32 files changed, 632 insertions(+), 198 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
new file mode 100644
index 0000000..f7c1428
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CompatibleCuratorCacheBridge.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Sets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.StandardListenerManager;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.stream.Stream;
+
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+
+@SuppressWarnings("deprecation")
+class CompatibleCuratorCacheBridge implements CuratorCacheBridge, TreeCacheListener
+{
+    private final TreeCache cache;
+    private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
+
+    CompatibleCuratorCacheBridge(CuratorFramework client, String path, CuratorCache.Options[] optionsArg, Executor executor, boolean cacheData)
+    {
+        Set<CuratorCache.Options> options = (optionsArg != null) ? Sets.newHashSet(optionsArg) : Collections.emptySet();
+        TreeCache.Builder builder = TreeCache.newBuilder(client, path).setCacheData(cacheData);
+        if ( options.contains(CuratorCache.Options.SINGLE_NODE_CACHE) )
+        {
+            builder.setMaxDepth(0);
+        }
+        if ( options.contains(CuratorCache.Options.COMPRESSED_DATA) )
+        {
+            builder.setDataIsCompressed(true);
+        }
+        if ( executor != null )
+        {
+            builder.setExecutor(executor);
+        }
+        cache = builder.build();
+    }
+
+    @Override
+    public void start()
+    {
+        try
+        {
+            cache.getListenable().addListener(this);
+
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        cache.close();
+    }
+
+    @Override
+    public Listenable<CuratorCacheListener> listenable()
+    {
+        return listenerManager;
+    }
+
+    @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        Map<String, ChildData> currentChildren = cache.getCurrentChildren(fromParent);
+        if ( currentChildren == null )
+        {
+            return Stream.empty();
+        }
+        return currentChildren.values().stream();
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+    {
+        switch ( event.getType() )
+        {
+            case NODE_ADDED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CREATED, null, event.getData()));
+                break;
+            }
+
+            case NODE_REMOVED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_DELETED, event.getData(), null));
+                break;
+            }
+
+            case NODE_UPDATED:
+            {
+                listenerManager.forEach(listener -> listener.event(NODE_CHANGED, null, event.getData()));
+                break;
+            }
+
+            case INITIALIZED:
+            {
+                listenerManager.forEach(CuratorCacheListener::initialized);
+                break;
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
new file mode 100644
index 0000000..3e329fc
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridge.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.listen.Listenable;
+import java.io.Closeable;
+import java.util.stream.Stream;
+
+/**
+ * A facade that uses {@link org.apache.curator.framework.recipes.cache.CuratorCache} if
+ * persistent watches are available or a {@link org.apache.curator.framework.recipes.cache.TreeCache}
+ * otherwise
+ */
+@SuppressWarnings("deprecation")
+public interface CuratorCacheBridge extends Closeable
+{
+    /**
+     * Start the cache. This will cause a complete refresh from the cache's root node and generate
+     * events for all nodes found, etc.
+     */
+    void start();
+
+    /**
+     * Close the cache, stop responding to events, etc.
+     */
+    @Override
+    void close();
+
+    /**
+     * Return the listener container so that listeners can be registered to be notified of changes to the cache
+     *
+     * @return listener container
+     */
+    Listenable<CuratorCacheListener> listenable();
+
+    /**
+     * Return a stream over the storage entries that are the immediate children of the given node.
+     *
+     * @param fromParent the parent node - determines the children returned in the stream
+     * @return stream over entries
+     */
+    Stream<ChildData> streamImmediateChildren(String fromParent);
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
index 35a5f26..a249d60 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilder.java
@@ -60,4 +60,12 @@ public interface CuratorCacheBuilder
      * @return new Curator Cache
      */
     CuratorCache build();
+
+    /**
+     * Return a new bridge cache based on the builder methods that have been called.
+     *
+     * @param cacheData if true, keep the data bytes cached. If false, clear them after sending notifications
+     * @return new bridge cache
+     */
+    CuratorCacheBridge buildBridge(boolean cacheData);
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
index 9f9e03d..8d7fa4b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBuilderImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.Compatibility;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 
@@ -69,6 +71,23 @@ class CuratorCacheBuilderImpl implements CuratorCacheBuilder
     @Override
     public CuratorCache build()
     {
+        return internalBuild(storage);
+    }
+
+    @Override
+    public CuratorCacheBridge buildBridge(boolean cacheData)
+    {
+        Preconditions.checkArgument(storage == null, "Custom CuratorCacheStorage is not supported by the TreeCache bridge");
+        if ( Compatibility.hasPersistentWatchers() )
+        {
+            return internalBuild(cacheData ? null : CuratorCacheStorage.bytesNotCached());
+        }
+        Preconditions.checkArgument(exceptionHandler == null, "ExceptionHandler is not supported by the TreeCache bridge");
+        return new CompatibleCuratorCacheBridge(client, path, options, executor, cacheData);
+    }
+
+    private CuratorCacheImpl internalBuild(CuratorCacheStorage storage)
+    {
         return new CuratorCacheImpl(client, storage, path, options, executor, exceptionHandler);
     }
 }
\ No newline at end of file
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index e6be71c..01faa39 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -40,12 +40,13 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 
-class CuratorCacheImpl implements CuratorCache
+class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
@@ -113,6 +114,12 @@ class CuratorCacheImpl implements CuratorCache
     }
 
     @Override
+    public Stream<ChildData> streamImmediateChildren(String fromParent)
+    {
+        return storage.streamImmediateChildren(fromParent);
+    }
+
+    @Override
     public Listenable<CuratorCacheListener> listenable()
     {
         return listenerManager;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
index c57e881..9381546 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilder.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.cache;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public interface CuratorCacheListenerBuilder
 {
@@ -88,9 +89,11 @@ public interface CuratorCacheListenerBuilder
      *
      * @param client the curator client
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @param basePath the path used as the root in the cache. Only events with a parent path matching this
+     *                 base path are sent to the listener
+     * @return this
      */
-    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener);
+    CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath);
 
     /**
      * Bridge listener. You can reuse old-style {@link org.apache.curator.framework.recipes.cache.TreeCacheListener}s
@@ -101,7 +104,7 @@ public interface CuratorCacheListenerBuilder
      *
      * @param client the curator client
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @return this
      */
     CuratorCacheListenerBuilder forTreeCache(CuratorFramework client, TreeCacheListener listener);
 
@@ -110,17 +113,28 @@ public interface CuratorCacheListenerBuilder
      * with CuratorCache.
      *
      * @param listener the listener to wrap
-     * @return a CuratorCacheListener that forwards to the given listener
+     * @return this
      */
     CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener);
 
     /**
      * Make the built listener so that it only becomes active once {@link CuratorCacheListener#initialized()} has been called.
      * i.e. changes that occur as the cache is initializing are not sent to the listener
+     *
+     * @return this
      */
     CuratorCacheListenerBuilder afterInitialized();
 
     /**
+     * Make the built listener so that it is only called for paths that return true when applied
+     * to the given filter.
+     *
+     * @param pathFilter path filter
+     * @return this
+     */
+    CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter);
+
+    /**
      * Build and return a new listener based on the methods that have been previously called
      *
      * @return new listener
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
index 4873868..479ba34 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheListenerBuilderImpl.java
@@ -20,14 +20,17 @@
 package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
 {
     private final List<CuratorCacheListener> listeners = new ArrayList<>();
     private boolean afterInitializedOnly = false;
+    private Predicate<String> pathFilter = __ -> true;
 
     @Override
     public CuratorCacheListenerBuilder forAll(CuratorCacheListener listener)
@@ -106,8 +109,9 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
     }
 
     @Override
-    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener)
+    public CuratorCacheListenerBuilder forPathChildrenCache(CuratorFramework client, PathChildrenCacheListener listener, String basePath)
     {
+        pathFilter = p -> ZKPaths.getPathAndNode(p).getPath().equals(basePath);
         listeners.add(new PathChildrenCacheListenerWrapper(client, listener));
         return this;
     }
@@ -134,6 +138,13 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
     }
 
     @Override
+    public CuratorCacheListenerBuilder withPathFilter(Predicate<String> pathFilter)
+    {
+        this.pathFilter = (pathFilter != null) ? (p -> (p == null) || pathFilter.test(p)) : (__ -> true);
+        return this;
+    }
+
+    @Override
     public CuratorCacheListener build()
     {
         List<CuratorCacheListener> copy = new ArrayList<>(listeners);
@@ -146,7 +157,11 @@ class CuratorCacheListenerBuilderImpl implements CuratorCacheListenerBuilder
             {
                 if ( isInitialized )
                 {
-                    copy.forEach(l -> l.event(type, oldData, data));
+                    ChildData filterData = (data != null) ? data : oldData;
+                    if ( pathFilter.test(filterData.getPath()) )
+                    {
+                        copy.forEach(l -> l.event(type, oldData, data));
+                    }
                 }
             }
 
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
index e809263..30f2b81 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheStorage.java
@@ -92,6 +92,7 @@ public interface CuratorCacheStorage
     /**
      * Return a stream over the storage entries that are the immediate children of the given node.
      *
+     * @param fromParent the parent node - determines the children returned in the stream
      * @return stream over entries
      */
     Stream<ChildData> streamImmediateChildren(String fromParent);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
index a9123c1..7e9730c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheListenerWrapper.java
@@ -39,19 +39,19 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
         {
             case NODE_CREATED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_ADDED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data));
                 break;
             }
 
             case NODE_CHANGED:
             {
-                sendEvent(data, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data));
                 break;
             }
 
             case NODE_DELETED:
             {
-                sendEvent(oldData, PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, oldData));
                 break;
             }
         }
@@ -60,12 +60,11 @@ class PathChildrenCacheListenerWrapper implements CuratorCacheListener
     @Override
     public void initialized()
     {
-        sendEvent(null, PathChildrenCacheEvent.Type.INITIALIZED);
+        sendEvent(new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null));
     }
 
-    private void sendEvent(ChildData node, PathChildrenCacheEvent.Type type)
+    private void sendEvent(PathChildrenCacheEvent event)
     {
-        PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, node);
         try
         {
             listener.childEvent(client, event);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index e2f3a8b..121e72c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -52,6 +52,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -78,6 +79,7 @@ import static org.apache.curator.utils.PathUtils.validatePath;
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final Executor executor;
     private final boolean createParentNodes;
     private final boolean disableZkWatches;
     private final TreeCacheSelector selector;
@@ -89,6 +91,7 @@ public class TreeCache implements Closeable
         private boolean cacheData = true;
         private boolean dataIsCompressed = false;
         private ExecutorService executorService = null;
+        private Executor executor = null;
         private int maxDepth = Integer.MAX_VALUE;
         private boolean createParentNodes = false;
         private boolean disableZkWatches = false;
@@ -105,12 +108,12 @@ public class TreeCache implements Closeable
          */
         public TreeCache build()
         {
-            ExecutorService executor = executorService;
-            if ( executor == null )
+            ExecutorService localExecutorService = executorService;
+            if ( (localExecutorService == null) && (executor == null) )
             {
-                executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
+                localExecutorService = Executors.newSingleThreadExecutor(defaultThreadFactory);
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, localExecutorService, executor, createParentNodes, disableZkWatches, selector);
         }
 
         /**
@@ -149,6 +152,15 @@ public class TreeCache implements Closeable
         }
 
         /**
+         * Sets the executor to publish events; a default executor will be created if not specified.
+         */
+        public Builder setExecutor(Executor executor)
+        {
+            this.executor = checkNotNull(executor);
+            return this;
+        }
+
+        /**
          * Sets the maximum depth to explore/watch.  A {@code maxDepth} of {@code 0} will watch only
          * the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the
          * root node and its immediate children (kind of like {@link PathChildrenCache}.
@@ -564,7 +576,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), null, false, false, new DefaultTreeCacheSelector());
     }
 
     /**
@@ -573,12 +585,14 @@ public class TreeCache implements Closeable
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the TreeCache's background thread
+     * @param executor          executor to use for the TreeCache's background thread
      * @param createParentNodes true to create parent nodes as containers
      * @param disableZkWatches true to disable Zookeeper watches
      * @param selector         the selector to use
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, final Executor executor, boolean createParentNodes, boolean disableZkWatches, TreeCacheSelector selector)
     {
+        this.executor = executor;
         this.createParentNodes = createParentNodes;
         this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
         this.root = new TreeNode(validatePath(path), null);
@@ -588,7 +602,7 @@ public class TreeCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
-        this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
+        this.executorService = executorService;
     }
 
     /**
@@ -623,7 +637,10 @@ public class TreeCache implements Closeable
             client.removeWatchers();
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
-            executorService.shutdown();
+            if ( executorService != null )
+            {
+                executorService.shutdown();
+            }
             try
             {
                 root.wasDeleted();
@@ -857,8 +874,9 @@ public class TreeCache implements Closeable
     {
         if ( treeState.get() != TreeState.CLOSED )
         {
+            Executor localExecutor = (executorService != null) ? executorService : executor;
             LOG.debug("publishEvent: {}", event);
-            executorService.submit(new Runnable()
+            localExecutor.execute(new Runnable()
             {
                 @Override
                 public void run()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
index 8cd1f65..b80c71e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -20,16 +20,19 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import java.io.Closeable;
+import java.util.AbstractMap;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Group membership management. Adds this instance into a group and
@@ -37,8 +40,9 @@ import java.util.Map;
  */
 public class GroupMember implements Closeable
 {
-    private final PersistentEphemeralNode pen;
-    private final PathChildrenCache cache;
+    private final PersistentNode pen;
+    private final CuratorCacheBridge cache;
+    private final String membershipPath;
     private final String thisId;
 
     /**
@@ -59,9 +63,10 @@ public class GroupMember implements Closeable
      */
     public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
     {
+        this.membershipPath = membershipPath;
         this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null");
 
-        cache = newPathChildrenCache(client, membershipPath);
+        cache = newCache(client, membershipPath);
         pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload);
     }
 
@@ -119,19 +124,11 @@ public class GroupMember implements Closeable
      */
     public Map<String, byte[]> getCurrentMembers()
     {
-        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
-        boolean thisIdAdded = false;
-        for ( ChildData data : cache.getCurrentData() )
-        {
-            String id = idFromPath(data.getPath());
-            thisIdAdded = thisIdAdded || id.equals(thisId);
-            builder.put(id, data.getData());
-        }
-        if ( !thisIdAdded )
-        {
-            builder.put(thisId, pen.getData());   // this instance is always a member
-        }
-        return builder.build();
+        Map<String, byte[]> map = new HashMap<>();
+        map.put(thisId, pen.getData());
+        return cache.streamImmediateChildren(membershipPath)
+            .map(data -> new AbstractMap.SimpleEntry<>(idFromPath(data.getPath()), data.getData()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k, v) -> v, () -> map));
     }
 
     /**
@@ -145,13 +142,13 @@ public class GroupMember implements Closeable
         return ZKPaths.getNodeFromPath(path);
     }
 
-    protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
+    protected PersistentNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload)
     {
-        return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload);
+        return new PersistentNode(client, CreateMode.EPHEMERAL, false, ZKPaths.makePath(membershipPath, thisId), payload);
     }
 
-    protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath)
+    protected CuratorCacheBridge newCache(CuratorFramework client, String membershipPath)
     {
-        return new PathChildrenCache(client, membershipPath, true);
+        return CuratorCache.builder(client, membershipPath).buildBridge(true);
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
index 4a75acf..edfecb5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCacheWrappers.java
@@ -56,7 +56,7 @@ public class TestCuratorCacheWrappers extends CuratorTestBase
                         events.offer(event.getType());
                     }
                 };
-                cache.listenable().addListener(builder().forPathChildrenCache(client, listener).build());
+                cache.listenable().addListener(builder().forPathChildrenCache(client, listener, "/test").build());
                 cache.start();
 
                 client.create().forPath("/test/one", "hey there".getBytes());
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
index 2da051f..85a2097 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -25,11 +25,13 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Map;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestGroupMember extends BaseClassForTests
 {
     // NOTE - don't need many tests as this class is just a wrapper around two existing recipes
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 5803893..4ca8356 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -89,6 +89,18 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <exclusions>
                 <exclusion>
@@ -114,6 +126,32 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-async</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-x-discovery</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>test</scope>
@@ -124,6 +162,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -151,6 +195,8 @@
                     <dependenciesToScan>
                         <dependency>org.apache.curator:curator-framework</dependency>
                         <dependency>org.apache.curator:curator-recipes</dependency>
+                        <dependency>org.apache.curator:curator-x-async</dependency>
+                        <dependency>org.apache.curator:curator-x-discovery</dependency>
                     </dependenciesToScan>
                     <groups>zk35,zk35Compatibility</groups>
                     <excludedGroups>zk36</excludedGroups>
diff --git a/curator-x-async/pom.xml b/curator-x-async/pom.xml
index 5ffd774..a32dbd3 100644
--- a/curator-x-async/pom.xml
+++ b/curator-x-async/pom.xml
@@ -49,4 +49,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
index c897b4e..2dd5625 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/CachedModeledFrameworkImpl.java
@@ -37,7 +37,6 @@ import org.apache.zookeeper.server.DataTree;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -47,18 +46,16 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
 {
     private final ModeledFramework<T> client;
     private final ModeledCacheImpl<T> cache;
-    private final Executor executor;
 
     CachedModeledFrameworkImpl(ModeledFramework<T> client, ExecutorService executor)
     {
-        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor), executor);
+        this(client, new ModeledCacheImpl<>(client.unwrap().unwrap(), client.modelSpec(), executor));
     }
 
-    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache, Executor executor)
+    private CachedModeledFrameworkImpl(ModeledFramework<T> client, ModeledCacheImpl<T> cache)
     {
         this.client = client;
         this.cache = cache;
-        this.executor = executor;
     }
 
     @Override
@@ -118,7 +115,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> child(Object child)
     {
-        return new CachedModeledFrameworkImpl<>(client.child(child), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.child(child), cache);
     }
 
     @Override
@@ -130,7 +127,7 @@ class CachedModeledFrameworkImpl<T> implements CachedModeledFramework<T>
     @Override
     public CachedModeledFramework<T> withPath(ZPath path)
     {
-        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache, executor);
+        return new CachedModeledFrameworkImpl<>(client.withPath(path), cache);
     }
 
     @Override
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
index b95e92d..6fe866d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledCacheImpl.java
@@ -19,34 +19,42 @@
 package org.apache.curator.x.async.modeled.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.ModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.ModeledCache;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
-import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.zookeeper.data.Stat;
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 {
-    private final TreeCache cache;
+    private final CuratorCacheBridge cache;
     private final Map<ZPath, Entry<T>> entries = new ConcurrentHashMap<>();
     private final ModelSerializer<T> serializer;
     private final ListenerContainer<ModeledCacheListener<T>> listenerContainer = new ListenerContainer<>();
     private final ZPath basePath;
+    private final CuratorFramework client;
+    private final Set<CreateOption> options;
 
     private static final class Entry<T>
     {
@@ -62,6 +70,7 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     ModeledCacheImpl(CuratorFramework client, ModelSpec<T> modelSpec, ExecutorService executor)
     {
+        this.client = client;
         if ( !modelSpec.path().isResolved() && !modelSpec.path().isRoot() && modelSpec.path().parent().isResolved() )
         {
             modelSpec = modelSpec.parent(); // i.e. the last item is a parameter
@@ -69,19 +78,40 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
         basePath = modelSpec.path();
         this.serializer = modelSpec.serializer();
-        cache = TreeCache.newBuilder(client, basePath.fullPath())
-            .setCacheData(false)
-            .setDataIsCompressed(modelSpec.createOptions().contains(CreateOption.compress))
-            .setExecutor(executor)
-            .setCreateParentNodes(modelSpec.createOptions().contains(CreateOption.createParentsIfNeeded) || modelSpec.createOptions().contains(CreateOption.createParentsAsContainers))
-            .build();
+        options = modelSpec.createOptions();
+        CuratorCacheBuilder builder = CuratorCache.builder(client, basePath.fullPath());
+        if ( modelSpec.createOptions().contains(CreateOption.compress) )
+        {
+            builder.withOptions(CuratorCache.Options.COMPRESSED_DATA);
+        }
+        if ( executor != null )
+        {
+            builder.withExecutor(executor);
+        }
+        cache = builder.buildBridge(false);
     }
 
     public void start()
     {
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forTreeCache(client, this)
+            .withPathFilter(path -> !path.equals(basePath.fullPath()))
+            .build();
         try
         {
-            cache.getListenable().addListener(this);
+            if ( options.contains(CreateOption.createParentsIfNeeded) )
+            {
+                if ( options.contains(CreateOption.createParentsAsContainers) )
+                {
+                    new EnsureContainers(client, basePath.fullPath()).ensure();
+                }
+                else
+                {
+                    ZKPaths.mkdirs(client.getZookeeperClient().getZooKeeper(), basePath.fullPath(), false, null, false);
+                }
+            }
+
+            cache.listenable().addListener(listener);
             cache.start();
         }
         catch ( Exception e )
@@ -92,7 +122,6 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
 
     public void close()
     {
-        cache.getListenable().removeListener(this);
         cache.close();
         entries.clear();
     }
@@ -159,16 +188,13 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_UPDATED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
+            byte[] bytes = event.getData().getData();
+            if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
             {
-                byte[] bytes = event.getData().getData();
-                if ( (bytes != null) && (bytes.length > 0) )    // otherwise it's probably just a parent node being created
-                {
-                    T model = serializer.deserialize(bytes);
-                    entries.put(path, new Entry<>(event.getData().getStat(), model));
-                    ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
-                    accept(type, path, event.getData().getStat(), model);
-                }
+                T model = serializer.deserialize(bytes);
+                entries.put(path, new Entry<>(event.getData().getStat(), model));
+                ModeledCacheListener.Type type = (event.getType() == TreeCacheEvent.Type.NODE_ADDED) ? ModeledCacheListener.Type.NODE_ADDED : ModeledCacheListener.Type.NODE_UPDATED;
+                accept(type, path, event.getData().getStat(), model);
             }
             break;
         }
@@ -176,13 +202,10 @@ class ModeledCacheImpl<T> implements TreeCacheListener, ModeledCache<T>
         case NODE_REMOVED:
         {
             ZPath path = ZPath.parse(event.getData().getPath());
-            if ( !path.equals(basePath) )
-            {
-                Entry<T> entry = entries.remove(path);
-                T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
-                Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
-                accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
-            }
+            Entry<T> entry = entries.remove(path);
+            T model = (entry != null) ? entry.model : serializer.deserialize(event.getData().getData());
+            Stat stat = (entry != null) ? entry.stat : event.getData().getStat();
+            accept(ModeledCacheListener.Type.NODE_REMOVED, path, stat, model);
             break;
         }
 
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index dbbf3cb..799845f 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -113,14 +113,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     @Override
     public CachedModeledFramework<T> cached()
     {
-        return cached(ThreadUtils.newSingleThreadExecutor("CachedModeledFramework"));
+        return cached(null);
     }
 
     @Override
     public CachedModeledFramework<T> cached(ExecutorService executor)
     {
         Preconditions.checkState(!isWatched, "CachedModeledFramework cannot be used with watched instances as the internal cache would bypass the watchers.");
-        return new CachedModeledFrameworkImpl<>(this, Objects.requireNonNull(executor, "executor cannot be null"));
+        return new CachedModeledFrameworkImpl<>(this, null);
     }
 
     @Override
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
index 2d33c13..830ee2b 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
 import org.apache.curator.x.async.modeled.models.TestModel;
@@ -38,6 +39,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestCachedModeledFramework extends TestModeledFrameworkBase
 {
     @Test
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 61a4570..5660539 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -37,7 +37,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
     protected ModelSpec<TestNewerModel> newModelSpec;
     protected AsyncCuratorFramework async;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     @Override
     public void setup() throws Exception
     {
@@ -54,7 +54,7 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
     }
 
-    @AfterMethod
+    @AfterMethod(alwaysRun = true)
     @Override
     public void teardown() throws Exception
     {
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index 824231d..50b2dea 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -80,4 +80,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
index a122d69..270005e 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCache.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListener>, InstanceProvider<T>
 {
@@ -33,12 +34,25 @@ public interface ServiceCache<T> extends Closeable, Listenable<ServiceCacheListe
      *
      * @return the list
      */
-    public List<ServiceInstance<T>> getInstances();
+    List<ServiceInstance<T>> getInstances();
 
     /**
-     * The cache must be started before use
+     * The cache must be started before use. This method blocks while the internal
+     * cache is loaded.
      *
      * @throws Exception errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The cache must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
index f542ed3..d09885b 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceProvider.java
@@ -23,6 +23,7 @@ import org.apache.curator.x.discovery.details.InstanceProvider;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * The main API for Discovery. This class is essentially a facade over a {@link ProviderStrategy}
@@ -31,11 +32,24 @@ import java.util.Collection;
 public interface ServiceProvider<T> extends Closeable
 {
     /**
-     * The provider must be started before use
+     * The provider must be started before use. This method blocks while the internal
+     * cache is loaded.
      *
      * @throws Exception any errors
      */
-    public void start() throws Exception;
+    void start() throws Exception;
+
+    /**
+     * The provider must be started before use. This version returns immediately.
+     * Use the returned latch to block until the cache is loaded
+     *
+     * @return a latch that can be used to block until the cache is loaded
+     * @throws Exception errors
+     */
+    default CountDownLatch startImmediate() throws Exception
+    {
+        throw new UnsupportedOperationException();
+    }
 
     /**
      * Return an instance for a single use. <b>IMPORTANT: </b> users
@@ -44,7 +58,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return the instance to use
      * @throws Exception any errors
      */
-    public ServiceInstance<T> getInstance() throws Exception;
+    ServiceInstance<T> getInstance() throws Exception;
 
     /**
      * Return the current available set of instances <b>IMPORTANT: </b> users
@@ -53,7 +67,7 @@ public interface ServiceProvider<T> extends Closeable
      * @return all known instances
      * @throws Exception any errors
      */
-    public Collection<ServiceInstance<T>> getAllInstances() throws Exception;
+    Collection<ServiceInstance<T>> getAllInstances() throws Exception;
 
     /**
      * Take note of an error connecting to the given instance. The instance will potentially
@@ -61,7 +75,7 @@ public interface ServiceProvider<T> extends Closeable
      *
      * @param instance instance that had an error
      */
-    public void noteError(ServiceInstance<T> instance);
+    void noteError(ServiceInstance<T> instance);
 
     /**
      * Close the provider. Note: it's the provider's responsibility to close any caches it manages
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index 8922233..501e289 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -47,13 +47,13 @@ class ServiceCacheBuilderImpl<T> implements ServiceCacheBuilder<T>
     @Override
     public ServiceCache<T> build()
     {
-        if (executorService != null)
+        if (threadFactory != null)
         {
-            return new ServiceCacheImpl<T>(discovery, name, executorService);
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
         }
         else
         {
-            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
         }
     }
 
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index d1a31ad..6ff68f1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBuilder;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +50,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final CuratorCacheBridge cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+    private String path;
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -73,12 +78,24 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(executorService, "executorService cannot be null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
-        cache.getListenable().addListener(this);
+        path = discovery.pathForName(name);
+
+        CuratorCacheBuilder builder = CuratorCache.builder(discovery.getClient(), path);
+        if ( executorService != null )
+        {
+            builder.withExecutor(executorService::submit);
+        }
+        cache = builder.buildBridge(false);
+
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .forPathChildrenCache(discovery.getClient(), this, path)
+            .forInitialized(this::initialized)
+            .withPathFilter(p -> !p.equals(path))
+            .build();
+        cache.listenable().addListener(listener);
     }
 
     @Override
@@ -94,11 +111,20 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void start() throws Exception
     {
+        startImmediate().await();
+    }
+
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        cache.start(true);
+        new EnsureContainers(discovery.getClient(), path).ensure();
+
+        cache.start();
         if ( debugStartLatch != null )
         {
+            initializedLatch.await();
             debugStartLatch.countDown();
             debugStartLatch = null;
         }
@@ -108,14 +134,7 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
             debugStartWaitLatch = null;
         }
 
-        for ( ChildData childData : cache.getCurrentData() )
-        {
-            if ( childData.getData() != null )  // else already processed by the cache listener
-            {
-                addInstance(childData, true);
-            }
-        }
-        discovery.cacheOpened(this);
+        return initializedLatch;
     }
 
     @Override
@@ -123,18 +142,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -166,39 +182,36 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
-            {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            addInstance(event.getData());
+            notifyListeners = true;
+            break;
+        }
 
-            case CHILD_REMOVED:
-            {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_REMOVED:
+        {
+            instances.remove(instanceIdFromData(event.getData()));
+            notifyListeners = true;
+            break;
+        }
         }
 
-        if ( notifyListeners )
+        if ( notifyListeners && (initializedLatch.getCount() == 0) )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -207,18 +220,23 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         return ZKPaths.getNodeFromPath(childData.getPath());
     }
 
-    private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
+    private void addInstance(ChildData childData)
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
-        if ( onlyIfAbsent )
+        try
         {
-            instances.putIfAbsent(instanceId, serviceInstance);
+            String instanceId = instanceIdFromData(childData);
+            ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+            instances.put(instanceId, serviceInstance);
         }
-        else
+        catch ( Exception e )
         {
-            instances.put(instanceId, serviceInstance);
+            throw new RuntimeException(e);
         }
-        cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
+    }
+
+    private void initialized()
+    {
+        discovery.cacheOpened(this);
+        initializedLatch.countDown();
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 476705c..13ae887 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -26,8 +26,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableUtils;
@@ -92,7 +93,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private static class Entry<T>
     {
         private volatile ServiceInstance<T> service;
-        private volatile NodeCache cache;
+        private volatile CuratorCacheBridge cache;
 
         private Entry(ServiceInstance<T> service)
         {
@@ -277,8 +278,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     @Override
     public ServiceCacheBuilder<T> serviceCacheBuilder()
     {
-        return new ServiceCacheBuilderImpl<T>(this)
-            .threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
+        return new ServiceCacheBuilderImpl<T>(this);
     }
 
     /**
@@ -458,52 +458,47 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
         }
     }
 
-    private NodeCache makeNodeCache(final ServiceInstance<T> instance)
+    private CuratorCacheBridge makeNodeCache(final ServiceInstance<T> instance)
     {
         if ( !watchInstances )
         {
             return null;
         }
 
-        final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
-        try
-        {
-            nodeCache.start(true);
-        }
-        catch ( InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            return null;
-        }
-        catch ( Exception e )
-        {
-            log.error("Could not start node cache for: " + instance, e);
-        }
-        NodeCacheListener listener = new NodeCacheListener()
-        {
-            @Override
-            public void nodeChanged() throws Exception
-            {
-                if ( nodeCache.getCurrentData() != null )
+        CuratorCacheBridge cache = CuratorCache.builder(client, pathForInstance(instance.getName(), instance.getId()))
+            .withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)
+            .buildBridge(false);
+        CuratorCacheListener listener = CuratorCacheListener.builder()
+            .afterInitialized()
+            .forAll((__, ___, data) -> {
+                if ( data != null )
                 {
-                    ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
-                    Entry<T> entry = services.get(newInstance.getId());
-                    if ( entry != null )
+                    try
                     {
-                        synchronized(entry)
+                        ServiceInstance<T> newInstance = serializer.deserialize(data.getData());
+                        Entry<T> entry = services.get(newInstance.getId());
+                        if ( entry != null )
                         {
-                            entry.service = newInstance;
+                            synchronized(entry)
+                            {
+                                entry.service = newInstance;
+                            }
                         }
                     }
+                    catch ( Exception e )
+                    {
+                        log.debug("Could not deserialize: " + data.getPath());
+                    }
                 }
                 else
                 {
                     log.warn("Instance data has been deleted for: " + instance);
                 }
-            }
-        };
-        nodeCache.getListenable().addListener(listener);
-        return nodeCache;
+            })
+            .build();
+        cache.listenable().addListener(listener);
+        cache.start();
+        return cache;
     }
 
     private void internalUnregisterService(final Entry<T> entry) throws Exception
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
index 2ab1434..a411f33 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceProviderImpl.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -76,6 +77,13 @@ public class ServiceProviderImpl<T> implements ServiceProvider<T>
         discovery.providerOpened(this);
     }
 
+    @Override
+    public CountDownLatch startImmediate() throws Exception
+    {
+        discovery.providerOpened(this);
+        return cache.startImmediate();
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index fda5c26..2a9d2d8 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -27,6 +27,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
index 06d63b9..e9a7956 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceCacheRace.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceCacheRace extends BaseClassForTests
 {
     private final Timing timing = new Timing();
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
index 54719a5..d86e193 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestServiceDiscovery extends BaseClassForTests
 {
     private static final Comparator<ServiceInstance<Void>> comparator = new Comparator<ServiceInstance<Void>>()
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
index 2d03c47..a96ff6f 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestWatchedInstances.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -35,6 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+@Test(groups = CuratorTestBase.zk35CompatibilityGroup)
 public class TestWatchedInstances extends BaseClassForTests
 {
     @Test
diff --git a/pom.xml b/pom.xml
index e223de6..00083c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,13 @@
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-discovery</artifactId>
+                <version>${project.version}</version>
             </dependency>
 
             <dependency>
@@ -400,6 +407,13 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-x-async</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-math</artifactId>
                 <version>${commons-math-version}</version>